上一篇介绍了编写 Flink 程序的基本步骤,以及一些常见 API,如:map、filter、keyBy 等,重点介绍了 keyBy 方法。本篇将继续介绍 Flink 中经常使用的 API,主要内容为apache
许多 transform 操做须要用户自定义函数来实现,Flink 支持多种自定义 transform 函数,接下来一一介绍。编程
/** * 实现 MapFunction 接口 * 其中泛型的第一 String 表明输入类型,第二个 Integer 表明输出类型 */ class MyMapFunction implements MapFunction<String, Integer> { @Override public Integer map(String value) { return Integer.parseInt(value); } } //使用 transform 函数 data.map(new MyMapFunction());
data.map(new MapFunction<String, Integer> () { public Integer map(String value) { return Integer.parseInt(value); } });
匿名类是 Java 语言定义的语法,与 “实现接口” 的方式同样,只不过不须要显示定义子类。这种方式比 “实现接口” 更常见一些。json
data.map(s -> Integer.parseInt(s)); //或者 data.map(Integer::parseInt);
Java 8 支持 Lambda 表达式,用法与 Scala 语法很像, 写起来简洁,而且容易维护,推荐使用这种方式。数组
顾名思义,比普通的 transform 函数要更丰富,额外提供了 4 个方法:open、close、getRuntimeContext 和 setRuntimeContext。它们能够用来建立/初始化本地状态、访问广播变量、访问累加器和计数器等。感受有点像 Hadoop 中的 Mapper 或者 Reducer 类。实现上,可使用自定义类继承 RichMapFunction 类的方式app
/** * 与实现 MapFunction 接口相似,这里是继承了 RichMapFunction 类 * 同时能够实现父类更多的方法 */ class MyRichMapFunction extends RichMapFunction<String, Integer> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); } @Override public Integer map(String value) throws Exception { return Integer.parseInt(value); } @Override public void close() throws Exception { super.close(); } } data.map(new MyRichMapFunction());
也可使用匿名类的方式框架
data.map (new RichMapFunction<String, Integer>() { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public RuntimeContext getRuntimeContext() { return super.getRuntimeContext(); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); } @Override public Integer map(String value) { return Integer.parseInt(value); } @Override public void close() throws Exception { super.close(); } });
若是在 rich function 中须要写较多的业务,那么用匿名类的方式并不简洁,而且可读性差。分布式
目前 Flink 支持 6 种数据类型ide
Tuple (元组)是一个混合类型,包含固定数量的属性,而且每一个属性类型能够不一样。例如:二元组有 2 个属性,类名为 Tuple2;三元组有 3 个属性,类名为 Tuple3,以此类推。Java 支持的元组为 Tuple1 - Tuple25。访问属性能够经过属性名直接访问,如:tuple.f4 表明 tuple 的第 5 个属性。或者使用 tuple.getField(int position) 方法,参数 position 从 0 开始。函数
/** * Tuple2 二元组做为 DataStream 的输入类型 */ DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements( new Tuple2<String, Integer>("hello", 1), new Tuple2<String, Integer>("world", 2)); wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() { @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } });
POJO(Plain Ordinary Java Object) 叫作简单的 Java 对象。知足如下条件的 Java 或 Scala 类会被 Flink 看作 POJO 类型oop
POJO 类型更易使用,且 Flink 更高效地处理 POJO 类型的数据。
public class WordWithCount { public String word; public int count; public WordWithCount() {} public WordWithCount(String word, int count) { this.word = word; this.count = count; } } DataStream<WordWithCount> wordCounts = env.fromElements( new WordWithCount("hello", 1), new WordWithCount("world", 2));
Flink 支持 Java 和 Scala 中全部的原子类型,例如: Integer、String 和 Double 等。
不是 POJO 类型的类都会被 Flink 看作是普通的类类型。Flink 将它们视为黑盒且不会访问它们的内容,普通类类型使用 Kryo 进行序列化与反序列化。这里是第二次提到序列化与反序列化,简单解释下这个概念。由于在分布式计算的系统中,不可避免要在不一样机器之间传输数据,所以为了高效传输数据且在不一样语言之间互相转化,须要经过某种协议(protobuf、kryo、avro、json)将对象转化成另一种形式(序列化),其余机器接到序列化的数据后再转化成以前的对象(反序列化)就能够正常使用了。
不一样于通常的序列化框架,Values 类型经过实现 org.apache.flinktypes.Value 接口里的 write 和 read 方法,实现本身的序列化和反序列化逻辑。当通常的序列化框架不够高效的时候,可使用 Values 类型。例如:对于一个用数组存储的稀疏向量。因为数组大多数元素为 0 ,能够仅对非 0 元素进行特殊编码,而通常的序列化框架会对全部元素进行序列化操做。
Flink 已经预约义了几种 Value 类型与基本数据类型相对应。如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue。这些 Value 类型能够看作是基本数据类型的变体,他们的值是可变的,容许程序重复利用对象,减轻 GC 的压力。例如:Java 基本数据类型 String 是不可变的,可是 Flink 的 StringValue 类型是可变的。
Flink 定义的 Value 类型与 Hadoop Writable 类型类似,本质都是经过改进基本数据类型的缺点,提供系统总体性能。
Hadoop Writable 类型也是手动实现了比较高效的序列化与反序列化的逻辑。Value 类型实现了 org.apache.finktypes.Value 接口,而 Hadoop Writable 类型实现了 org.apache.hadoop.Writable 接口,该接口定义了 write 和 readFields 方法用来手动实现序列化与反序列化逻辑。
特殊类型包括 Scala 中的 Either, Option, and Try 类型,以及 Java API 中的 Either 类型。
累加器能够经过 add 操做,对程序中的某些状态或者操做进行计数,job 结束后会返回计数的结果。累加器能够用来调试或者记录信息。
能够自定义累加器,须要实现 Accumulator 接口,固然 Flink 提供了两种内置的累加器
使用累加器的步骤以下:
private IntCounter numLines = new IntCounter();
getRuntimeContext().addAccumulator("num-lines", this.numLines);
this.numLines.add(1);
myJobExecutionResult.getAccumulatorResult("num-lines")
Job 结束后,累加器的最终值存储在 JobExecutionResult
对象中,能够经过 execute 方法返回值来获取 JobExecutionResult
对象。可是对于批处理没法使用调用这个方法(官网没有提到),能够经过 env.getLastJobExecutionResult 方法获取。下面是使用累加器的完整示例
public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> data = env.readTextFile("你的输入路径"); //使用 rich function transform 函数 DataSet<Integer> dataSet = data.map(new MyRichMapFunction()); // 执行程序 dataSet.collect(); // 得到 job 的结果 JobExecutionResult jobExecutionResult = env.getLastJobExecutionResult(); int res = jobExecutionResult.getAccumulatorResult("num-lines"); // 输出累加器的值 System.out.println(res); } // 自定义 rich function /** * 与实现 MapFunction 接口相似,这里是继承了 RichMapFunction 类 * 同时能够实现父类更多的方法 */ class MyRichMapFunction extends RichMapFunction<String, Integer> { /** * 定义累加器 */ private IntCounter numLines = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { // 注册累加器 getRuntimeContext().addAccumulator("num-lines", this.numLines); } @Override public Integer map(String value) throws Exception { // 累加器自增,记录处理的行数 this.numLines.add(1); return Integer.parseInt(value); } }
Flink 基本 API 的使用介绍完了,本篇主要介绍了自定义的 transform 函数、Flink 支持的数据类型和累加器。后续会详细介绍 Flink 的原理、机制以及编程模型。
欢迎关注公众号「渡码」