flink学习系列--基础知识学习(四)

前言

这一讲将介绍一下序列化机制和过程函数(processfunction)。html

序列化机制

使用 Flink 编写处理逻辑时,新手老是容易被林林总总的概念所混淆:java

为何 Flink 有那么多的类型声明方式?
 BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何区别?
 TypeInfoFactory 又是什么?
 TypeInformation.of 和 TypeHint 是如何使用的呢?

接下来本文将逐步解密 Flink 的类型和序列化机制(TypeInformation)。git

clipboard.png

Flink 的类型系统源码位于 org.apache.flink.api.common.typeinfo 包,让咱们对上图TypeInformation深刻追踪,看一下类的继承关系图:github

clipboard.png

能够看到,上面两个图片是一一对应的,TypeInformation 类是描述一切类型的公共基类,它和它的全部子类必须可序列化(Serializable),由于类型信息将会伴随 Flink 的做业提交,被传递给每一个执行节点。apache

因为 Flink 本身管理内存,采用了一种很是紧凑的存储格式(见官方博文),于是类型信息在整个数据处理流程中属于相当重要的元数据。api

  • TypeExtractror 类型提取

Flink 内部实现了名为 TypeExtractror 的类,能够利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(固然也能够显式声明,即本文所介绍的内容)。函数

然而因为 Java 的类型擦除,自动提取并非老是有效。于是一些状况下(例如经过 URLClassLoader 动态加载的类),仍需手动处理;例以下图中对 DataSet 变换时,使用 .returns() 方法声明返回类型。spa

这里须要说明一下,returns() 接受三种类型的参数:字符串描述的类名(例如 "String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java 原生 Class(例如 String.class) 等;不过字符串形式的用法即将废弃,若是确实有必要,请使用 Class.forName() 等方法来解决。code

clipboard.png

  • 声明类型信息的常见手段

经过 TypeInformation.of() 方法,能够简单地建立类型信息对象。orm

1. 对于非泛型的类,直接传入 Class 对象便可

clipboard.png

2.对于泛型类,须要借助 TypeHint 来保存泛型类型信息

clipboard.png

3. 预约义的快捷方式

例如 BasicTypeInfo,这个类定义了一系列经常使用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的类型声明,能够直接使用。

clipboard.png

4. 自定义 TypeInfo 和 TypeInfoFactory

经过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用 @TypeInfo 注解,随后建立相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。
注意须要继承 TypeInformation 类,为每一个字段定义类型,并覆盖元数据方法,例如是不是基本类型(isBasicType)、是不是 Tuple(isTupleType)、元数(对于一维的 Row 类型,等于字段的个数)等等,从而为 TypeExtractor 提供决策依据。
更多示例,请参考 Flink 源码的 org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java

clipboard.png

Kryo 序列化 待研究中...

相关文章
相关标签/搜索