在第一篇介绍 Flink 的文章 《《从0到1学习Flink》—— Apache Flink 介绍》 中就说过 Flink 程序的结构windows
Flink 应用程序结构就是如上图所示:网络
一、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,固然你也能够定义本身的 source。session
二、Transformation:数据转换的各类操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操做不少,能够将数据转换计算成你想要的数据。app
三、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能须要存储下来,Flink 常见的 Sink 大概有以下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能够定义本身的 Sink。socket
在上四篇文章介绍了 Source 和 Sink:ide
一、《从0到1学习Flink》—— Data Source 介绍函数
二、《从0到1学习Flink》—— 如何自定义 Data Source ?oop
三、《从0到1学习Flink》—— Data Sink 介绍学习
四、《从0到1学习Flink》—— 如何自定义 Data Sink ?spa
那么这篇文章咱们就来看下 Flink Data Transformation 吧,数据转换操做仍是蛮多的,须要好好讲讲!
这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流:
仍是拿上一篇文章的案例来将数据进行 map 转换操做:
1 2 3 4 5 6 7 8 9 10 11 12 |
SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() { @Override public Student map(Student value) throws Exception { Student s1 = new Student(); s1.id = value.id; s1.name = value.name; s1.password = value.password; s1.age = value.age + 5; return s1; } }); map.print(); |
将每一个人的年龄都增长 5 岁,其余不变。
FlatMap 采用一条记录并输出零个,一个或多个记录。
1 2 3 4 5 6 7 8 9 |
SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() { @Override public void flatMap(Student value, Collector<Student> out) throws Exception { if (value.id % 2 == 0) { out.collect(value); } } }); flatMap.print(); |
这里将 id 为偶数的汇集出来。
Filter 函数根据条件判断出结果。
1 2 3 4 5 6 7 8 9 10 |
SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() { @Override public boolean filter(Student value) throws Exception { if (value.id > 95) { return true; } return false; } }); filter.print(); |
这里将 id 大于 95 的过滤出来,而后打印出来。
KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。
1 2 3 4 5 6 7 |
KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() { @Override public Integer getKey(Student value) throws Exception { return value.age; } }); keyBy.print(); |
上面对 student 的 age 作 KeyBy 操做分区
Reduce 返回单个的结果值,而且 reduce 操做每处理一个元素老是建立一个新值。经常使用的方法有 average, sum, min, max, count,使用 reduce 方法均可实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() { @Override public Integer getKey(Student value) throws Exception { return value.age; } }).reduce(new ReduceFunction<Student>() { @Override public Student reduce(Student value1, Student value2) throws Exception { Student student1 = new Student(); student1.name = value1.name + value2.name; student1.id = (value1.id + value2.id) / 2; student1.password = value1.password + value2.password; student1.age = (value1.age + value2.age) / 2; return student1; } }); reduce.print(); |
上面先将数据流进行 keyby 操做,由于执行 reduce 操做只能是 KeyedStream,而后将 student 对象的 age 作了一个求平均值的操做。
Fold 经过将最后一个文件夹流与当前记录组合来推出 KeyedStream。 它会发回数据流。
1 2 3 4 5 6 |
KeyedStream.fold("1", new FoldFunction<Integer, String>() { @Override public String fold(String accumulator, Integer value) throws Exception { return accumulator + "=" + value; } }) |
DataStream API 支持各类聚合,例如 min,max,sum 等。 这些函数能够应用于 KeyedStream 以得到 Aggregations 聚合。
1 2 3 4 5 6 7 8 9 10 |
KeyedStream.sum(0) KeyedStream.sum("key") KeyedStream.min(0) KeyedStream.min("key") KeyedStream.max(0) KeyedStream.max("key") KeyedStream.minBy(0) KeyedStream.minBy("key") KeyedStream.maxBy(0) KeyedStream.maxBy("key") |
max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具备最大值的键, min 和 minBy 同理。
Window 函数容许按时间或其余条件对现有 KeyedStream 进行分组。 如下是以 10 秒的时间窗口聚合:
1 |
inputStream.keyBy(0).window(Time.seconds(10)); |
Flink 定义数据片断以便(可能)处理无限数据流。 这些切片称为窗口。 此切片有助于经过应用转换处理数据块。 要对流进行窗口化,咱们须要分配一个能够进行分发的键和一个描述要对窗口化流执行哪些转换的函数
要将流切片到窗口,咱们可使用 Flink 自带的窗口分配器。 咱们有选项,如 tumbling windows, sliding windows, global 和 session windows。 Flink 还容许您经过扩展 WindowAssginer 类来编写自定义窗口分配器。 这里先预留下篇文章来说解这些不一样的 windows 是如何工做的。
windowAll 函数容许对常规数据流进行分组。 一般,这是非并行数据转换,由于它在非分区数据流上运行。
与常规数据流功能相似,咱们也有窗口数据流功能。 惟一的区别是它们处理窗口数据流。 因此窗口缩小就像 Reduce 函数同样,Window fold 就像 Fold 函数同样,而且还有聚合。
1 |
inputStream.keyBy(0).windowAll(Time.seconds(10)); |
Union 函数将两个或多个数据流结合在一块儿。 这样就能够并行地组合数据流。 若是咱们将一个流与自身组合,那么它会输出每一个记录两次。
1 |
inputStream.union(inputStream1, inputStream2, ...); |
咱们能够经过一些 key 将同一个 window 的两个数据流 join 起来。
1 2 3 4 |
inputStream.join(inputStream1) .where(0).equalTo(1) .window(Time.seconds(5)) .apply (new JoinFunction () {...}); |
以上示例是在 5 秒的窗口中链接两个流,其中第一个流的第一个属性的链接条件等于另外一个流的第二个属性。
此功能根据条件将流拆分为两个或多个流。 当您得到混合流而且您可能但愿单独处理每一个数据流时,可使用此方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); |
此功能容许您从拆分流中选择特定流。
1 2 3 4 |
SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd"); |
Project 函数容许您从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。
1 2 |
DataStream<Tuple4<Integer, Double, String, String>> in = // [...] DataStream<Tuple2<String, String>> out = in.project(3,2); |
上述函数从给定记录中选择属性号 2 和 3。 如下是示例输入和输出记录:
1 2 |
(1,10.0,A,B)=> (B,A) (2,20.0,C,D)=> (D,C) |
本文主要介绍了 Flink Data 的经常使用转换方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了点简单的 demo 介绍了如何使用,具体在项目中该如何将数据流转换成咱们想要的格式,还须要根据实际状况对待。