往期推荐:api
Flink深刻浅出:内存模型源码分析
Flink深刻浅出:JDBC Source从理论到实战this
Flink深刻浅出:JDBC Connector源码分析设计
Flink的经典使用场景是ETL,即Extract抽取、Transform转换、Load加载,能够从一个或多个数据源读取数据,通过处理转换后,存储到另外一个地方,本篇将会介绍如何使用DataStream API来实现这种应用。注意Flink Table和SQL
api 会很适合来作ETL,可是不妨碍从底层的DataStream API来了解其中的细节。3d
无状态即不须要在操做中维护某个中间状态,典型的例子如map和flatmap。
下面是一个转换操做的例子,须要根据输入数据建立一个出租车起始位置和目标位置的对象。首先定义出租车的位置对象:
public static class EnrichedRide extends TaxiRide { public int startCell; public int endCell; public EnrichedRide() {} public EnrichedRide(TaxiRide ride) { this.rideId = ride.rideId; this.isStart = ride.isStart; ... this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat); this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat); } public String toString() { return super.toString() + "," + Integer.toString(this.startCell) + "," + Integer.toString(this.endCell); } }
使用的时候能够注册一个MapFunction,该函数接收TaxiRide对象,输出EnrichRide对象。
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> { @Override public EnrichedRide map(TaxiRide taxiRide) throws Exception { return new EnrichedRide(taxiRide); } }
使用时只须要建立map对象便可:
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); DataStream<EnrichedRide> enrichedNYCRides = rides .filter(new RideCleansingSolution.NYCFilter()) .map(new Enrichment()); enrichedNYCRides.print();
MapFunction适合一对一的转换,对于输入流的每一个元素都有一个元素输出。若是须要一对多的场景,可使用flatmap:
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...)); DataStream<EnrichedRide> enrichedNYCRides = rides .flatMap(new NYCEnrichment()); enrichedNYCRides.print();
FlatMapFunction的定义:
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> { @Override public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception { FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter(); if (valid.filter(taxiRide)) { out.collect(new EnrichedRide(taxiRide)); } } }
经过collector,能够在flatmap中任意添加零个或多个元素。
有时须要对数据流按照某个字段进行分组,每一个事件会根据该字段相同的值汇总到一块儿。好比,但愿查找相同出发位置的路线。若是在SQL中可能会使用GROUP BY startCell,在Flink中能够直接使用keyBy函数:
rides .flatMap(new NYCEnrichment()) .keyBy(value -> value.startCell)
keyBy会引发重分区而致使网络数据shuffle,一般这种代价都很昂贵,由于每次shuffle时须要进行数据的序列化和反序列化,既浪费CPU资源,又占用网络带宽。
经过对startCell进行分组,这种方式的分组可能会因为编译器而丢失字段的类型信息,所以Flink也支持把字段包装成Tuple,基于元素位置进行分组。固然也支持使用KeySelector函数,自定义分组规则。
rides .flatMap(new NYCEnrichment()) .keyBy( new KeySelector<EnrichedRide, int>() { @Override public int getKey(EnrichedRide enrichedRide) throws Exception { return enrichedRide.startCell; } })
能够直接使用lambda表达式:
rides .flatMap(new NYCEnrichment()) .keyBy(enrichedRide -> enrichedRide.startCell)
keyselector不限制从必须从事件中抽取key,也能够自定义任何计算key的方法。但须要保证输出的key是一致的,而且实现了对应的hashCode和equals方法。生成key的规则必定要稳定,由于生成key可能在应用运行的任什么时候间,所以必定要保证key生成规则的持续稳定。
key能够经过某个字段选择:
keyBy(enrichedRide -> enrichedRide.startCell)
也能够直接替换成某个方法:
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
下面的例子中,建立了一个包含startCell和花费时间的二元组:
import org.joda.time.Interval; DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() { @Override public void flatMap(EnrichedRide ride, Collector<Tuple2<Integer, Minutes>> out) throws Exception { if (!ride.isStart) { Interval rideInterval = new Interval(ride.startTime, ride.endTime); Minutes duration = rideInterval.toDuration().toStandardMinutes(); out.collect(new Tuple2<>(ride.startCell, duration)); } } });
如今须要输出每一个起始位置最长距离的路线,有不少种方式能够实现。以上面的数据为例,能够经过startcell进行聚合,而后选择时间最大的元素输出:
minutesByStartCell .keyBy(value -> value.f0) // .keyBy(value -> value.startCell) .maxBy(1) // duration .print();
能够获得输出结果:
4> (64549,5M) 4> (46298,18M) 1> (51549,14M) 1> (53043,13M) 1> (56031,22M) 1> (50797,6M) ... 1> (50797,8M) ... 1> (50797,11M) ... 1> (50797,12M)
上面是一个有状态的例子,Flink须要记录每一个key的最大值。不管什么时候在应用中涉及到状态,都须要考虑这个状态有多大。若是key的空间是无限大的,那么flink可能须要维护大量的状态信息。当使用流时,必定要对无限窗口的聚合十分敏感,由于它是对整个流进行操做,颇有可能由于维护的状态信息不断膨胀,而致使内存溢出。在上面使用的maxBy就是经典的的聚合操做,也可使用更通用的reduce来自定义聚合方法。
Flink针对状态的管理有不少易用的特性,好比:
Flink有几种函数接口,包括FilterFunction, MapFunction,FlatMapFunction等。对于每一个接口,Flink都提供了对应的Rich方法。好比RichFlatMapFunction,提供了额外的一些方法:
下面是一个针对事件的key进行去重的例子:
private static class Event { public final String key; public final long timestamp; ... } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new EventSource()) .keyBy(e -> e.key) .flatMap(new Deduplicator()) .print(); env.execute(); }
为了实现这个功能,deduplicator须要记住一些信息,对于每一个key,都须要记录是否已经存在。Flink支持几种不一样类型的状态,最简单的一种是valueState。对于每一个key,flink都为它保存一个对象,在上面的例子中对象是Boolean。Deduplicator有两个方法:open()和flatMap()。open方法经过descriptor为状态起了一个标识名称,并声明类型为Boolean。
public static class Deduplicator extends RichFlatMapFunction<Event, Event> { ValueState<Boolean> keyHasBeenSeen; @Override public void open(Configuration conf) { ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN); keyHasBeenSeen = getRuntimeContext().getState(desc); } @Override public void flatMap(Event event, Collector<Event> out) throws Exception { if (keyHasBeenSeen.value() == null) { out.collect(event); keyHasBeenSeen.update(true); } } }
flatMap中调用state.value()获取状态。flink在上下文中为每一个key保存了一个状态值,只有当值为null时,说明这个key以前没有出现过,而后将其更新为true。当flink调用open时,状态是空的。可是当调用flatMap时,key能够经过context进行访问。当在集群模式中运行时,会有不少个Deduplicator实例,每一个负责维护一部分key的事件。所以,当使用单个事件的valuestate时,要理解它背后其实不是一个值,而是每一个key都对应一个状态值,而且分布式的存储在集群中的各个节点进程上。
有时候key的空间多是无限制的,flink会为每一个key存储一个boolean对象。若是key的数量是有限的还好,可是应用每每是持续不间断的运行,那么key可能会无限增加,所以须要清理再也不使用的key。能够经过state.clear()
进行清理。好比针对某个key按照某一时间频率进行清理,在processFunction中能够了解到如何在事件驱动的应用中执行定时器操做。也能够在状态描述符中为状态设置TTL生存时间,这样状态能够自动进行清理。
状态也支持在非key类型的上下文中使用,这种叫作操做符状态,operator state。典型的场景是Flink读取Kafka时记录的offset信息。
大部分场景中Flink都是接收一个数据流输出一个数据流,相似管道式的处理数据:
也有的场景须要动态的修改函数中的信息,好比阈值、规则或者其余的参数,这种设计叫作connected streams,流会拥有两个输入,相似:
在下面的例子中,经过控制流用来指定必须过滤的单词:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x); DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x); control .connect(datastreamOfWords) .flatMap(new ControlFunction()) .print(); env.execute(); }
两个流能够经过key的方式链接,keyby用来分组数据,这样保证相同类型的数据能够进入到相同的实例中。上面的例子两个流都是字符串,
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> { private ValueState<Boolean> blocked; @Override public void open(Configuration config) { blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class)); } @Override public void flatMap1(String control_value, Collector<String> out) throws Exception { blocked.update(Boolean.TRUE); } @Override public void flatMap2(String data_value, Collector<String> out) throws Exception { if (blocked.value() == null) { out.collect(data_value); } } }
blocked用于记录key的控制逻辑,key的state会在两个流间共享。flatMap1和flatMap2会被两个流调用,分别用来更新和获取状态,从而实现经过一个流控制另外一个流的目的。
总结:本片从状态上讲述了有状态的操做和无状态的操做,还介绍了状态的使用以及链接流的适用场景。后面会介绍DataStream的操做和状态的管理。