Flink 物理分区

本文来自官网: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#physical-partitioningjava

Flink还经过如下函数对转换后的数据精确流分区进行低级控制(若是须要)。apache

一、自定义分区json

  使用用户定义的分区程序为每一个元素选择目标任务。性能优化

dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)

如简单的hash 分区(下面的实例不是官网):网络

val input = env.addSource(source) .map(json => { // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"}
        val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) .setParallelism(1) .partitionCustom(new Partitioner[String] { override def partition(key: String, numPartitions: Int): Int = { // numPartitions 是下游算子的并发数
          key.hashCode % numPartitions } }, "id") .map(l => { LateDataEvent(l.key, l.id, l.amt, l.createTime) }) .setParallelism(3)

 注:key 是传入的field 的类型并发

二、随机分区dom

根据均匀分布随机分配元素(相似于: random.nextInt(5),0 - 5 在几率上是均匀的)ide

dataStream.shuffle()

源码:函数

@Internal public class ShufflePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 传入下游分区数
return random.nextInt(numberOfChannels); } @Override public StreamPartitioner<T> copy() { return new ShufflePartitioner<T>(); } @Override public String toString() { return "SHUFFLE"; } }

 

三、均匀分区  rebalance性能

分区元素循环,每一个分区建立相等的负载。在存在数据偏斜时用于性能优化。

dataStream.rebalance()

源码:

public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @Override public void setup(int numberOfChannels) { super.setup(numberOfChannels); nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { // 轮训的发往下游分区
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "REBALANCE"; } }

 

四、rescale

分区元素循环到下游操做的子集。若是您但愿拥有管道,例如,从源的每一个并行实例扇出到多个映射器的子集以分配负载但又不但愿发生rebalance()会产生彻底从新平衡,那么这很是有用。这将仅须要本地数据传输而不是经过网络传输数据,具体取决于其余配置值,例如TaskManagers的插槽数。
上游操做发送元素的下游操做的子集取决于上游和下游操做的并行度。例如,若是上游操做具备并行性2而且下游操做具备并行性4,则一个上游操做将元素分配给两个下游操做,而另外一个上游操做将分配给另外两个下游操做。另外一方面,若是下游操做具备并行性2而上游操做具备并行性4,那么两个上游操做将分配到一个下游操做,而另外两个上游操做将分配到其余下游操做。在不一样并行度不是彼此的倍数的状况下,一个或多个下游操做将具备来自上游操做的不一样数量的输入。

dataStream.rescale()

源码:

public class RescalePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "RESCALE"; } }

很遗憾这段代码只能看出,上游分区往下游分区发的时候,每一个上游分区内部的数据是轮训发到下游分区的(没找到具体分配的地方,从这段代码debug,一直往上,找到分区出如今 RuntimeEnvironment 的对象里面,找不具体分配的地方)。

五、广播

向每一个分区广播元素。

dataStream.broadcast()