做为一个编程新手,我不再怕Flink迷了个人眼!

欢迎你们前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~html

本文由kyledong发表于云+社区专栏java

使用 Flink 编写处理逻辑时,新手老是容易被林林总总的概念所混淆:git

为何 Flink 有那么多的类型声明方式?github

BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何区别?apache

TypeInfoFactory 又是什么?api

TypeInformation.of 和 TypeHint 是如何使用的呢?机器学习

接下来本文将逐步解密 Flink 的类型和序列化机制。函数

img图 1:Flink 类型分类性能

Flink 的类型系统源码位于 org.apache.flink.api.common.typeinfo 包,让咱们对图 1 深刻追踪,看一下类的继承关系图:学习

img图 2:TypeInformation 类继承关系图

能够看到,图 1 和 图 2 是一一对应的,TypeInformation 类是描述一切类型的公共基类,它和它的全部子类必须可序列化(Serializable),由于类型信息将会伴随 Flink 的做业提交,被传递给每一个执行节点。

因为 Flink 本身管理内存,采用了一种很是紧凑的存储格式(见官方博文),于是类型信息在整个数据处理流程中属于相当重要的元数据。

TypeExtractror 类型提取

Flink 内部实现了名为 TypeExtractror 的类,能够利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(固然也能够显式声明,即本文所介绍的内容)。

然而因为 Java 的类型擦除,自动提取并非老是有效。于是一些状况下(例如经过 URLClassLoader 动态加载的类),仍需手动处理;例以下图中对 DataSet 变换时,使用 .returns() 方法声明返回类型。

这里须要说明一下,returns() 接受三种类型的参数:字符串描述的类名(例如 "String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java 原生 Class(例如 String.class) 等;不过字符串形式的用法即将废弃,若是确实有必要,请使用 Class.forName() 等方法来解决。

img图 3:使用 .returns 方法声明返回类型

下面是 ExecutionEnvironment 类的 registerType 方法,它能够向 Flink 注册子类信息(Flink 认识父类,但不必定认识子类的一些独特特性,于是须要注册),下面是 Flink-ML 机器学习库代码的例子:

img图 4:Flink-ML 注册子类类型信息

从下图能够看到,若是经过 TypeExtractor.createTypeInfo(type) 方法获取到的类型信息属于 PojoTypeInfo 及其子类,那么将其注册到一块儿;不然统一交给 Kryo 去处理,Flink 并不过问(这种状况下性能会变差)。

img图 5:Flink 容许注册自定义类型

声明类型信息的常见手段

经过 TypeInformation.of() 方法,能够简单地建立类型信息对象。

1. 对于非泛型的类,直接传入 Class 对象便可

img图 6:class 对象做为参数

2. 对于泛型类,须要借助 TypeHint 来保存泛型类型信息

TypeHint 的原理是建立匿名子类,运行时 TypeExtractor 能够经过 getGenericSuperclass(). getActualTypeArguments() 方法获取保存的实际类型。

img图 7:TypeHint 做为参数,保存泛型信息

3. 预约义的快捷方式

例如 BasicTypeInfo,这个类定义了一系列经常使用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,能够直接使用。

img图 8:BasicTypeInfo 快捷方式

例以下面是对 Row 类型各字段的类型声明,使用方法很是简明,再也不须要 new XxxTypeInfo<>(不少不少参数)

img图 9:使用 BasicTypeInfo 快捷方式来声明一行(Row)每一个字段的类型信息

固然,若是以为 BasicTypeInfo 仍是太长,Flink 还提供了彻底等价的 Types 类(org.apache.flink.api.common.typeinfo.Types):

img图 10:Types 类

特别须要注意的是,flink-table 模块也有一个 Types 类(org.apache.flink.table.api.Types),用于 table 模块内部的类型定义信息,用法稍有不一样。使用 IDE 的自动 import 时必定要当心:

img图 11:flink-table 模块的 Types 类

4. 自定义 TypeInfo 和 TypeInfoFactory

经过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存储更紧凑,运行时也更高效。

开发者在自定义类上使用 @TypeInfo 注解,随后建立相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。

注意须要继承 TypeInformation 类,为每一个字段定义类型,并覆盖元数据方法,例如是不是基本类型(isBasicType)、是不是 Tuple(isTupleType)、元数(对于一维的 Row 类型,等于字段的个数)等等,从而为 TypeExtractor 提供决策依据。

img图 12:为自定义类提供类型支持(图片未展现所有字段)

更多示例,请参考 Flink 源码的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java

TypeSerializer

Flink 自带了不少 TypeSerializer 子类,大多数状况下各类自定义类型都是经常使用类型的排列组合,于是能够直接复用:

img图 13:Flink 自带的 TypeSerializer 子类概览

若是不能知足,那么能够继承 TypeSerializer 及其子类以实现本身的序列化器。

Kryo 序列化

对于 Flink 没法序列化的类型(例如用户自定义类型,没有 registerType,也没有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理。

若是 Kryo 仍然没法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有如下两种解决方案:

\1. 能够强制使用 Avro 来替代 Kryo:

env.getConfig().enableForceAvro();   // env 表明 ExecutionEnvironment 对象, 下同

\2. 为 Kryo 增长自定义的 Serializer 以加强 Kryo 的功能:

env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass

img图 14:为 Kryo 增长自定义的 Serializer

以及

env.getConfig().registerTypeWithKryoSerializer(Class<?> type, T serializer)

img图 15:为 Kryo 增长自定义的 Serializer

若是但愿彻底禁用 Kryo(100% 使用 Flink 的序列化机制),则可使用如下设置,但注意一切没法处理的类都将致使异常:

env.getConfig().disableGenericTypes();

类型机制的陷阱与缺陷

金无足赤,人无完人。Flink 内置的类型系统虽然强大而灵活,但仍然有一些须要注意的点:

1. Lambda 函数的类型提取

因为 Flink 类型提取依赖于继承等机制,而 lambda 函数比较特殊,它是匿名的,也没有与之相关的类,因此其类型信息较难获取。

Eclipse 的 JDT 编译器会把 lambda 函数的泛型签名等信息写入编译后的字节码中,而对于 javac 等常见的其余编译器,则不会这样作,于是 Flink 就没法获取具体类型信息了。

2. Kryo 的 JavaSerializer 在 Flink 下存在 Bug

推荐使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 而非 com.esotericsoftware.kryo.serializers.JavaSerializer 以防止与 Flink 不兼容。

类型机制与内存管理

img图 16:类型信息到内存块

下面以 StringSerializer 为例,来看下 Flink 是如何紧凑管理内存的:

img图 17:StringSerializer 类的 serialize() 方法

下面是具体的序列化过程:

img图 18:String 对象的序列化过程

能够看到,Flink 对于内存管理是很是细致的,井井有条,代码也容易理解。

参考阅读

Data Types & Serialization

Flink 原理与实现:内存管理

Flink 的数据类型和序列化

问答
如何使用Flink Quickstart在Eclipse IDE中缺乏依赖关系?
相关阅读
Storm做业转化为Flink做业流程分析
Apache Calcite 功能简析及在 Flink 的应用
【每日课程推荐】机器学习实战!快速入门在线广告业务及CTR相应知识

此文已由做者受权腾讯云+社区发布,更多原文请点击

搜索关注公众号「云加社区」,第一时间获取技术干货,关注后回复1024 送你一份技术课程大礼包!

海量技术实践经验,尽在云加社区

相关文章
相关标签/搜索