自从函数式编程和响应式编程逐渐进入到程序员的生活以后,map函数做为其中一个重要算子也为你们所熟知,不管是前端web开发,手机开发仍是后端服务器开发,都很难逃过它的手心。而在大数据领域中又每每能够见到另一个算子mapPartition的身影。在性能调优中,常常会被建议尽可能用 mappartition 操做去替代 map 操做。本文将从Flink源码和示例入手,为你们解析为何mapPartition比map更高效。html
Map的做用是将数据流上每一个元素转换为另外的元素,好比data.map { x => x.toInt }
。它把数组流
中的每个值,使用所提供的函数执行一遍,一一对应。获得与元素个数相同的数组流
。而后返回这个新数据流。前端
MapPartition的做用是单个函数调用并行分区,好比data.mapPartition { in => in map { (_, 1) } }
。该函数将分区做为“迭代器”,能够产生任意数量的结果。每一个分区中的元素数量取决于并行度和之前的operations。java
其实,二者完成的业务操做是同样的,本质上都是将数据流上每一个元素转换为另外的元素。程序员
区别主要在两点。web
从逻辑实现来说,sql
void mapPartition(Iterable<T> values, Collector<O> out)
。其中values是须要映射转换的全部记录,out是用来发送结果的collector。具体返回什么,如何操做out来返回结果,则彻底依赖于业务逻辑。从调用次数来讲,数据库
为何MapPartition有这么高效呢,下面咱们将具体论证。apache
首先咱们给出示例代码,从下文中咱们能够看出,map就是简单的转换,而mapPartition则不但要作转换,程序员还须要手动操做如何返回结果:编程
public class IteratePi { public static void main(String[] args) throws Exception { final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment(); //迭代次数 int iterativeNum=10; DataSet<Integer> wordList = env.fromElements(1, 2, 3); IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum); DataSet<Integer> mapResult=iterativeDataSet .map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { value += 1; return value; } }); //迭代结束的条件 DataSet<Integer> result=iterativeDataSet.closeWith(mapResult); result.print(); MapPartitionOperator<Integer, Integer> mapPartitionResult = iterativeDataSet .mapPartition(new MapPartitionFunction<Integer, Integer>() { @Override public void mapPartition(Iterable<Integer> values, Collector<Integer> out) { for (Integer value : values) { // 这里须要程序员自行决定如何返回,即调用collect操做。 out.collect(value + 2); } } } ); //迭代结束的条件 DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult); partitionResult.print(); } }
世界上不多有没有来由的爱,也少见免费的午饭。mapPartition之因此高效,其所依赖的基础就是Flink的传输机制。因此咱们下面就讲解下为何。后端
你们都知道,Spark是用微批处理来模拟流处理,就是说,spark仍是一批一批的传输和处理数据,因此咱们就能理解mapPartition的机制就是基于这一批数据作统一处理。这样确实能够高效。
可是Flink号称是纯流,即Flink是每来一个输入record,就进行一次业务处理,而后返回给下游算子。
有的兄弟就会产生疑问:每次都只是处理单个记录,怎么可以让mapPartition作到批次处理呢。其实这就是Flink的微妙之处:即Flink确实是每次都处理一个输入record,可是在上下游传输时候,Flink仍是把records累积起来作批量传输的。也能够这么理解:从传输的角度讲,Flink是微批处理的。
Flink 的网络栈是组成 flink-runtime 模块的核心组件之一,也是 Flink 做业的核心部分。全部来自 TaskManager 的工做单元(子任务)都经过它来互相链接。流式传输数据流都要通过网络栈,因此它对 Flink 做业的性能表现(包括吞吐量和延迟指标)相当重要。与经过 Akka 使用 RPC 的 TaskManager 和 JobManager 之间的协调通道相比,TaskManager 之间的网络栈依赖的是更底层的,基于 Netty 的 API。
一个运行的application的tasks在持续交换数据。TaskManager负责作数据传输。不一样任务之间的每一个(远程)网络链接将在 Flink 的网络栈中得到本身的 TCP 通道。可是若是同一任务的不一样子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络链接将被多路复用,并共享一个 TCP 信道以减小资源占用。
每一个TaskManager有一组网络缓冲池(默认每一个buffer是32KB),用于发送与接受数据。如发送端和接收端位于不一样的TaskManager进程中,则它们须要经过操做系统的网络栈进行交流。流应用须要以管道的模式进行数据交换,也就是说,每对TaskManager会维持一个永久的TCP链接用于作数据交换。在shuffle链接模式下(多个sender与多个receiver),每一个sender task须要向每一个receiver task发送数据,此时TaskManager须要为每一个receiver task都分配一个缓冲区。
一个记录被建立并传递以后(例如经过 Collector.collect()),它会被递交到RecordWriter,其未来自 Java 对象的记录序列化为一个字节序列,后者最终成为网络缓存。RecordWriter 首先使用SpanningRecordSerializer将记录序列化为一个灵活的堆上字节数组。而后它尝试将这些字节写入目标网络通道的关联网络缓存。
由于若是逐个发送会下降每一个记录的开销并带来更高的吞吐量,因此为了取得高吞吐量,TaskManager的网络组件首先从缓冲buffer中收集records,而后再发送。也就是说,records并非一个接一个的发送,而是先放入缓冲,而后再以batch的形式发送。这个技术能够高效使用网络资源,并达到高吞吐。相似于网络或磁盘 I/O 协议中使用的缓冲技术。
接收方网络栈(netty)将接收到的缓存写入适当的输入通道。最后(流式)任务的线程从这些队列中读取并尝试在RecordReader的帮助下,经过Deserializer将积累的数据反序列化为 Java 对象。
若sender与receiver任务都运行在同一个TaskManager进程,则sender任务会将发送的条目作序列化,并存入一个字节缓冲。而后将缓冲放入一个队列,直到队列被填满。
Receiver任务从队列中获取缓冲,并反序列化输入的条目。因此,在同一个TaskManager内,任务之间的数据传输并不通过网络交互。
即在同一个TaskManager进程内,也是批量传输。
咱们基于Flink优化的结果进行分析验证,看看Flink是否是把记录写入到buffer中,这种状况下运行的是CountingCollector和ChainedMapDriver。
copyFromSerializerToTargetChannel:153, RecordWriter (org.apache.flink.runtime.io.network.api.writer) emit:116, RecordWriter (org.apache.flink.runtime.io.network.api.writer) emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer) collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) invoke:196, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
当执行完用户定义的map函数以后,系统运行在 ChainedMapDriver.collect 函数。
public void collect(IT record) { this.outputCollector.collect(this.mapper.map(record));// mapper就是用户代码 }
而后调用到了CountingCollector.collect
public void collect(OUT record) { this.collector.collect(record);// record就是用户转换后的记录 }
OutputCollector.collect函数会把记录发送给全部的writers。
this.delegate.setInstance(record);// 先把record设置到SerializationDelegate中 for (RecordWriter<SerializationDelegate<T>> writer : writers) { // 全部的writer writer.emit(this.delegate); // 发送record }
RecordWriter
负责把数据序列化,而后写入到缓存中。它有两个实现类:
BroadcastRecordWriter
: 维护了多个下游channel,发送数据到下游全部的channel中。ChannelSelectorRecordWriter
: 经过channelSelector
对象判断数据须要发往下游的哪一个channel。咱们用的正是这个RecordWriter
。这里咱们分析下ChannelSelectorRecordWriter
的emit
方法:
public void emit(T record) throws IOException, InterruptedException { emit(record, channelSelector.selectChannel(record)); }
这里使用了channelSelector.selectChannel
方法。该方法为record寻找到对应下游channel id。
public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> { public final int selectChannel(SerializationDelegate<T> record) { switch (strategy) { case FORWARD: return forward(); // 咱们代码用到了这种状况。这里 return 0; ...... } } }
接下来咱们又回到了父类RecordWriter.emit
。
protected void emit(T record, int targetChannel) throws IOException, InterruptedException { serializer.serializeRecord(record); // Make sure we don't hold onto the large intermediate serialization buffer for too long if (copyFromSerializerToTargetChannel(targetChannel)) { serializer.prune(); } }
关键的逻辑在于copyFromSerializerToTargetChannel
。此方法从序列化器中复制数据到目标channel,咱们能够看出来,每条记录都是写入到buffer中。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { // We should reset the initial position of the intermediate serialization buffer before // copying, so the serialization results can be copied to multiple target buffers. // 此处Serializer为SpanningRecordSerializer // reset方法将serializer内部的databuffer position重置为0 serializer.reset(); boolean pruneTriggered = false; // 获取目标channel的bufferBuilder // bufferBuilder内维护了MemorySegment,即内存片断 // Flink的内存管理依赖MemorySegment,可实现堆内堆外内存的管理 // RecordWriter内有一个bufferBuilder数组,长度和下游channel数目相同 // 该数组以channel ID为下标,存储和channel对应的bufferBuilder // 若是对应channel的bufferBuilder还没有建立,调用requestNewBufferBuilder申请一个新的bufferBuilder BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); // 复制serializer的数据到bufferBuilder中 SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); // 循环直到result彻底被写入到buffer // 一条数据可能会被写入到多个缓存中 // 若是缓存不够用,会申请新的缓存 // 数据彻底写入完毕之时,当前正在操做的缓存是没有写满的 // 所以返回true,代表须要压缩该buffer的空间 while (result.isFullBuffer()) { finishBufferBuilder(bufferBuilder); // If this was a full record, we are done. Not breaking out of the loop at this point // will lead to another buffer request before breaking out (that would not be a // problem per se, but it can lead to stalls in the pipeline). if (result.isFullRecord()) { pruneTriggered = true; emptyCurrentBufferBuilder(targetChannel); break; } bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.copyToBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); // 若是buffer超时时间为0,须要flush目标channel的数据 if (flushAlways) { flushTargetPartition(targetChannel); } return pruneTriggered; }
Driver是Flink runtime的一个重要概念,是在一个task中运行的用户业务逻辑组件,具体实现了批量操做代码。其内部API包括初始化,清除,运行,取消等逻辑。
public interface Driver<S extends Function, OT> { ...... void setup(TaskContext<S, OT> context); void run() throws Exception; void cleanup() throws Exception; void cancel() throws Exception; }
具体在 org.apache.flink.runtime.operators 目录下,咱们可以看到各类Driver的实现,基本的算子都有本身的Driver。
...... CoGroupDriver.java FlatMapDriver.java FullOuterJoinDriver.java GroupReduceCombineDriver.java GroupReduceDriver.java JoinDriver.java LeftOuterJoinDriver.java MapDriver.java MapPartitionDriver.java ......
map算子对应的就是MapDriver。
结合上节咱们知道,上游数据是经过batch方式批量传入的。因此,在run函数会遍历输入,每次取出一个record,而后调用用户自定义函数function.map对这个record作map操做。
public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> { @Override public void run() throws Exception { final MutableObjectIterator<IT> input = this.taskContext.getInput(0); ..... else { IT record = null; // runtime主动进行循环,这样致使大量函数调用 while (this.running && ((record = input.next()) != null)) { numRecordsIn.inc(); output.collect(function.map(record)); // function是用户函数 } } } }
MapPartitionDriver是mapPartition的具体组件。系统会把获得的批量数据inIter一次性的都传给用户自定义函数,由用户代码来进行遍历操做。
public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> { @Override public void run() throws Exception { final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn); ...... } else { final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer()); // runtime不参与循环,这样能够减小函数调用 function.mapPartition(inIter, output); } } }
咱们可以看到map和mapPartition的input都是MutableObjectIterator
假设有上亿个数据须要map,这资源占用和运行速度效率差异会至关大。
以前提到了优化,这里咱们再详细深刻下如何优化map算子。
Flink有一个关键的优化技术称为任务链,用于(在某些状况下)减小本地通讯的过载。为了知足任务链的条件,至少两个以上的operator必须配置为同一并行度,而且使用本地向前的(local forwad)方式链接。任务链能够被认为是一种管道。
当管道以任务链的方式执行时候,Operators的函数被融合成单个任务,并由一个单独的线程执行。一个function产生的records,经过使用一个简单的方法调用,被递交给下一个function。因此这里在方法之间的records传递中,基本没有序列化以及通讯消耗。
针对优化后的Operator Chain,runtime对应的Driver则是ChainedMapDriver。这是经过 MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0)
, 映射获得的。
咱们能够看到,由于是任务链,因此每一个record是直接在管道中流淌 ,ChainedMapDriver连循环都省略了,直接map转换后丢给下游去也。
public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { private MapFunction<IT, OT> mapper; // 用户函数 @Override public void collect(IT record) { try { this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); } } } // 这时的调用栈以下 map:23, UserFunc$1 (com.alibaba.alink) collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining) collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics) invoke:196, DataSourceTask (org.apache.flink.runtime.operators) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
map和mapPartition实现的基础是Flink的数据传输机制 :Flink确实是每次都处理一个输入record,可是在上下游之间传输时候,Flink仍是把records累积起来作批量传输。便可以认为从数据传输模型角度讲,Flink是微批次的。
对于数据流转换,由于是批量传输,因此对于积累的records,map是在runtime Driver代码中进行循环,mapPartition在用户代码中进行循环。
map的函数调用次数要远高于mapPartition。若是在用户函数中涉及到频繁建立额外的对象或者外部资源操做,则mapPartition性能远远高出。
若是没有connection之类的操做,则一般性能差异并不大,一般不会成为瓶颈,也没有想象的那么严重。