Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。迭代算法在不少数据分析领域会用到,好比机器学习或者图计算。本文将经过Superstep入手看看Alink是如何利用Flink迭代API来实现具体算法。html
由于Alink的公开资料太少,因此如下均为自行揣测,确定会有疏漏错误,但愿你们指出,我会随时更新。java
为何提到 Superstep 这个概念,是由于在撸KMeans代码的时候,发现几个很奇怪的地方,好比如下三个步骤中,都用到了context.getStepNo(),并且会根据其数值的不一样进行不一样业务操做:git
public class KMeansPreallocateCentroid extends ComputeFunction { public void calc(ComContext context) { LOG.info("liuhao KMeansPreallocateCentroid "); if (context.getStepNo() == 1) { /** 具体业务逻辑代码 * Allocate memory for pre-round centers and current centers. */ } } } public class KMeansAssignCluster extends ComputeFunction { public void calc(ComContext context) { ...... if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } /** 具体业务逻辑代码 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ } } public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } /** 具体业务逻辑代码 * Update the centroids based on the sum of points and point number belonging to the same cluster. */ }
查看ComContext的源码,发现stepNo的来源竟然是runtimeContext.getSuperstepNumber()
。web
public class ComContext { private final int taskId; private final int numTask; private final int stepNo; // 对,就是这里 private final int sessionId; public ComContext(int sessionId, IterationRuntimeContext runtimeContext) { this.sessionId = sessionId; this.numTask = runtimeContext.getNumberOfParallelSubtasks(); this.taskId = runtimeContext.getIndexOfThisSubtask(); this.stepNo = runtimeContext.getSuperstepNumber(); // 这里进行了变量初始化 } /** * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}. * @return iteration step number. */ public int getStepNo() { return stepNo; // 这里是使用 } }
看到这里有的兄弟可能会虎躯一震,这不是BSP模型的概念嘛。我就是想写个KMeans算法,怎么除了MPI模型,还要考虑BSP模型。下面就让咱们一步一步挖掘究竟Alink都作了什么工做。算法
在 Flink 中的执行图能够分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图apache
由于某种缘由,Flink内部对这两个概念的使用自己就有些混乱:在Task Manager里这个subtask的概念由一个叫Task的类来实现。Task Manager里谈论的Task对象实际上对应的是ExecutionGraph里的一个subtask。编程
因此这两个概念须要理清楚。api
Flink 中的程序本质上是并行的。在执行期间,每个算子 Operator (Transformation)都有一个或多个算子subTask(Operator SubTask),每一个算子的 subTask 之间都是彼此独立,并在不一样的线程中执行,而且可能在不一样的机器或容器上执行。sass
Task( SubTask) 是一个Runnable 对象, Task Manager接受到TDD 后会用它实例化成一个Task对象, 并启动一个线程执行Task的Run方法。网络
TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交给TM的数据结构。 他包含了关于Task的全部描述信息。好比:
在如下状况下会从新划分task
好比有以下操做
DataStream<String> text = env.socketTextStream(hostname, port); DataStream counts = text .filter(new FilterClass()) .map(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(2)
那么StreamGraph的转换流是:
Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink
其task是四个:
其中每一个task又会被分红分若干subtask。在执行时,一个Task会被并行化成若干个subTask实例进行执行,一个subTask对应一个执行线程。
以上说了这么多,就是要说jobGraph和subtask,由于本文中咱们在分析源码和调试时候,主要是从jobGraph这里开始入手来看subtask。
JobGraph是在StreamGraph的基础之上,对StreamNode进行了关联合并的操做,好比对于source -> flatMap -> reduce -> sink 这样一个数据处理链,当source和flatMap知足连接的条件时,能够能够将两个操做符的操做放到一个线程并行执行,这样能够减小网络中的数据传输,因为在source和flatMap之间的传输的数据也不用序列化和反序列化,因此也提升了程序的执行效率。
相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已经不彻底是“静态”的数据结构了,由于它加入了中间结果集(IntermediateDataSet)这一“动态”概念。
做业顶点(JobVertex)、中间数据集(IntermediateDataSet)、做业边(JobEdge)是组成JobGraph的基本元素。这三个对象彼此之间互为依赖:
那么JobGraph是怎么组织并存储这些元素的呢?其实JobGraph只以Map的形式存储了全部的JobVertex,键是JobVertexID:
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
至于其它的元素,经过JobVertex均可以根据关系找寻到。须要注意的是,用于迭代的反馈边(feedback edge)当前并不体如今JobGraph中,而是被内嵌在特殊的JobVertex中经过反馈信道(feedback channel)在它们之间创建关系。
BSP模型是并行计算模型的一种。并行计算模型一般指从并行算法的设计和分析出发,将各类并行计算机(至少某一类并行计算机)的基本特征抽象出来,造成一个抽象的计算模型。
BSP模型是一种异步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息传递系统,块内异步并行,块间显式同步,该模型基于一个master协调,全部的worker同步(lock-step)执行, 数据从输入的队列中读取。
BSP计算模型不只是一种体系结构模型,也是设计并行程序的一种方法。BSP程序设计准则是总体同步(bulk synchrony),其独特之处在于超步(superstep)概念的引入。一个BSP程序同时具备水平和垂直两个方面的结构。从垂直上看,一个BSP程序由一系列串行的超步(superstep)组成。
BSP模型的实现大概举例以下:
Flink-Gelly利用Flink的高效迭代算子来支持海量数据的迭代式图处理。目前,Flink Gelly提供了“Vertex-Centric”,“Scatter-Gather”以及“Gather-Sum-Apply”等计算模型的实现。
“Vertex-Centric”迭代模型也就是咱们常常听到的“Pregel”,是一种从Vertex角度出发的图计算方式。其中,同步地迭代计算的步骤称之为“superstep”。在每一个“superstep”中,每一个顶点都执行一个用户自定义的函数,且顶点之间经过消息进行通讯,当一个顶点知道图中其余任意顶点的惟一ID时,该顶点就能够向其发送一条消息。
可是实际上,KMeans不是图处理,Alink也没有基于Flink-Gelly来构建。也许只是借鉴了其概念。因此咱们还须要再探寻。
迭代算法在不少数据分析领域会用到,好比机器学习或者图计算。为了从大数据中抽取有用信息,这个时候每每会须要在处理的过程当中用到迭代计算。
所谓迭代运算,就是给定一个初值,用所给的算法公式计算初值获得一个中间结果,而后将中间结果做为输入参数进行反复计算,在知足必定条件的时候获得计算结果。
大数据处理框架不少,好比spark,mr。实际上这些实现迭代计算都是很困难的。
Flink直接支持迭代计算。Flink实现迭代的思路也是很简单,就是实现一个step函数,而后将其嵌入到迭代算子中去。有两种迭代操做算子: Iterate和Delta Iterate。两个操做算子都是在未收到终止迭代信号以前一直调用step函数。
这种迭代方式称为全量迭代,它会将整个数据输入,通过必定的迭代次数,最终获得你想要的结果。
迭代操做算子包括了简单的迭代形式:每次迭代,step函数会消费全量数据(本次输入和上次迭代的结果),而后计算获得下轮迭代的输出(例如,map,reduce,join等)
迭代过程主要分为如下几步:
它迭代的结束条件是:
编程的时候,须要调用iterate(int),该函数返回的是一个IterativeDataSet,固然咱们能够对它进行一些操做,好比map等。Iterate函数惟一的参数是表明最大迭代次数。
迭代是一个环。咱们须要进行闭环操做,那么这时候就要用到closeWith(Dataset)操做了,参数就是须要循环迭代的dataset。也能够可选的指定一个终止标准,操做closeWith(DataSet, DataSet),能够经过判断第二个dataset是否为空,来终止迭代。若是不指定终止迭代条件,迭代就会在迭代了最大迭代次数后终止。
DataSet API引进了独特的同步迭代机制(superstep-based),仅限于用在有界的流。
咱们将迭代操做算子的每一个步骤函数的执行称为单个迭代。在并行设置中,在迭代状态的不一样分区上并行计算step函数的多个实例。在许多设置中,对全部并行实例上的step函数的一次评估造成了所谓的superstep,这也是同步的粒度。所以,迭代的全部并行任务都须要在初始化下一个superstep以前完成superstep。终止准则也将被评估为superstep同步屏障。
下面是Apache原文
We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.
下面是apache原图
归纳以下:
每次迭代都是一个superstep 每次迭代中有若干subtask在不一样的partition上分别执行step 每一个step有一个HeadTask,若干IntermediateTask,一个TailTask 每一个superstep有一个SynchronizationSinkTask 同步,由于迭代的全部并行任务须要在下一个迭代前完成
由此咱们能够知道,superstep这是Flink DataSet API的概念,可是你从这里可以看到BSP模型的影子,好比:
KMeansTrainBatchOp.iterateICQ函数中,生成了一个IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。
return new IterativeComQueue() .initWithPartitionedData(TRAIN_DATA, data) .initWithBroadcastData(INIT_CENTROID, initCentroid) .initWithBroadcastData(KMEANS_STATISTICS, statistics) .add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 终止条件 .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) .setMaxIter(maxIter) // 迭代最大次数 .exec();
而BaseComQueue.exec函数中则有:
public DataSet<Row> exec() { IterativeDataSet<byte[]> loop // Flink 迭代API = loopStartDataSet(executionEnvironment) .iterate(maxIter); // 后续操做能看出来,以前添加在queue上的好比KMeansPreallocateCentroid,都是在loop之上运行的。 if (null == compareCriterion) { loopEnd = loop.closeWith... } else { // compare Criterion. DataSet<Boolean> criterion = input ... compareCriterion loopEnd = loop.closeWith( ... criterion ... ) } }
再仔细研究代码,咱们能够看出:
superstep包括:
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
终止标准就是
利用KMeansIterTermination构建了一个RichMapPartitionFunction做为终止标准。最后结束时候调用 KMeansOutputModel完成业务操做。
最大循环就是
.setMaxIter(maxIter)
因而咱们能够得出结论,superstep-based Bulk Iterate 迭代算子是用来实现总体KMeans算法,KMeans算法就是一个superstep进行迭代。可是在superstep内容若是须要通信或者栅栏同步,则采用了MPI的allReduce。
咱们须要深刻到Flink内部去挖掘验证,若是你们有兴趣,能够参见下面调用栈,本身添加断点来研究。
execute:56, LocalExecutor (org.apache.flink.client.deployment.executors) executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java) execute:860, ExecutionEnvironment (org.apache.flink.api.java) execute:844, ExecutionEnvironment (org.apache.flink.api.java) collect:413, DataSet (org.apache.flink.api.java) sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils) sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils) linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink) link:89, BatchOperator (com.alibaba.alink.operator.batch) linkTo:239, BatchOperator (com.alibaba.alink.operator.batch) print:337, BatchOperator (com.alibaba.alink.operator.batch) main:35, KMeansExample (com.alibaba.alink)
Alink和Flink构建联系,是在print调用中完成的。由于是本地调试,Flink会启动一个miniCluster,而后会作以下操做。
当咱们看到了submitJob
调用,就知道KMeans代码已经和Flink构建了联系。
@Internal public class LocalExecutor implements PipelineExecutor { public static final String NAME = "local"; @Override public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception { // we only support attached execution with the local executor. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); final JobGraph jobGraph = getJobGraph(pipeline, configuration); final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration); final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster); CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph); jobIdFuture .thenCompose(clusterClient::requestJobResult) .thenAccept((jobResult) -> clusterClient.shutDownCluster()); return jobIdFuture.thenApply(jobID -> new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID)); }
生成jobGraph的具体流程是:
if (dataSet instanceof BulkIterationResultSet)
,则调用translateBulkIteration(bulkIterationResultSet);
if (c instanceof BulkIterationBase)
,以生成BulkIterationNode前面代码中,getJobGraph函数做用是生成了job graph。
而后 JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。
最后 JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task。
因此咱们须要看看最终运行时候,迭代API对应着哪些Task。
针对IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了以下的task。
IterationHeadTask主要做用是协调一次迭代。
它会读取初始输入,和迭代Tail创建一个BlockingBackChannel。在成功处理输入以后,它会发送EndOfSuperstep事件给本身的输出。它在每次superstep以后会联系 synchronization task,等到本身收到一个用来同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示全部其余的heads已经完成了本身的迭代。
下一次迭代时候,上一次迭代中tail的输出就经由backchannel传输,造成了head的输入。什么时候进入到下一个迭代,是由HeadTask完成的。一旦迭代完成,head将发送TerminationEvent给全部和它关联的task,告诉他们shutdown。
barrier.waitForOtherWorkers(); if (barrier.terminationSignaled()) { requestTermination(); nextStepKickoff.signalTermination(); } else { incrementIterationCounter(); String[] globalAggregateNames = barrier.getAggregatorNames(); Value[] globalAggregates = barrier.getAggregates(); aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates); // 在这里发起下一次Superstep。 nextStepKickoff.triggerNextSuperstep(); } }
IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中构建的。其例子以下:
"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"
IterationIntermediateTask是superstep中间段的task,其将传输EndOfSuperstepEvent和TerminationEvent给全部和它关联的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代状态。
若是迭代状态被更新,本task的输出将传送回IterationHeadTask,在这种状况下,本task将做为head再次被安排。
IterationIntermediateTask的例子以下:
"MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)" "Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
IterationTailTask是迭代的最末尾。若是迭代状态被更新,本task的输出将经过BlockingBackChannel传送回IterationHeadTask,反馈给迭代头就意味着一个迭代完整逻辑的完成,那么就能够关闭这个迭代闭合环了。这种状况下,本task将在head所在的实例上从新被调度。
这里有几个关键点须要注意:
Flink有一个BlockingQueueBroker类,这是一个阻塞式的队列代理,它的做用是对迭代并发进行控制。Broker是单例的,迭代头任务和尾任务会生成一样的broker ID,因此头尾在同一个JVM中会基于相同的dataChannel进行通讯。dataChannel由迭代头建立。
IterationHeadTask中会生成BlockingBackChannel,这是一个容量为1的阻塞队列。
// 生成channel BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); // 而后block在这里,等待Tail superstepResult = backChannel.getReadEndAfterSuperstepEnded();
IterationTailTask则是以下:
// 在基类获得channel,由于是单例,因此会获得同一个 worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey()); // notify iteration head if responsible for workset update 在这里通知Head worksetBackChannel.notifyOfEndOfSuperstep();
而二者都是利用以下办法来创建联系,在同一个subtask中会使用同一个brokerKey,这样首尾就联系起来了。
public String brokerKey() { if (this.brokerKey == null) { int iterationId = this.config.getIterationId(); this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask(); } return this.brokerKey; }
这是经过output.collect来完成的。
首先,在Tail初始化时候,会生成一个outputCollector,这个outputCollector会被设置为本task的输出outputCollector。这样就保证了用户函数的输出都会转流到outputCollector。
而outputCollector的输出就是worksetBackChannel的输出,这里设置为同一个instance。这样用户输出就输出到backChannel中。
@Override protected void initialize() throws Exception { super.initialize(); // set the last output collector of this task to reflect the iteration tail state update: // a) workset update, // b) solution set update, or // c) merged workset and solution set update Collector<OT> outputCollector = null; if (isWorksetUpdate) { // 生成一个outputCollector outputCollector = createWorksetUpdateOutputCollector(); // we need the WorksetUpdateOutputCollector separately to count the collected elements if (isWorksetIteration) { worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector; } } ...... // 把outputCollector设置为本task的输出 setLastOutputCollector(outputCollector); }
outputCollector的输出就是worksetBackChannel的输出buffer,这里设置为同一个instance。
protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) { DataOutputView outputView = worksetBackChannel.getWriteEnd(); TypeSerializer<OT> serializer = getOutputSerializer(); return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate); }
运行时候以下:
@Override public void run() throws Exception { SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey()); while (this.running && !terminationRequested()) { // 用户在这里输出,最后会输出到output.collect,也就是worksetBackChannel的输出buffer。 super.run(); // 这时候以及输出到channel完毕,只是通知head进行读取。 if (isWorksetUpdate) { // notify iteration head if responsible for workset update worksetBackChannel.notifyOfEndOfSuperstep(); } else if (isSolutionSetUpdate) { // notify iteration head if responsible for solution set update solutionSetUpdateBarrier.notifySolutionSetUpdate(); } ... }
IterationTailTask例子以下:
"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"
IterationSynchronizationSinkTask做用是同步全部的iteration heads,IterationSynchronizationSinkTask被是实现成一个 output task。其只是用来协调,不处理任何数据。
在每一次superstep,IterationSynchronizationSinkTask只是等待直到它从每个head都收到一个WorkerDoneEvent。这表示下一次superstep能够开始了。
这里须要注意的是 SynchronizationSinkTask 如何等待各个并行度的headTask。好比Flink的并行度是5,那么SynchronizationSinkTask怎么作到等待这5个headTask。
在IterationSynchronizationSinkTask中,注册了SyncEventHandler来等待head的WorkerDoneEvent。
this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader()); this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);
在SyncEventHandler中,咱们能够看到,在构建时候,numberOfEventsUntilEndOfSuperstep就被设置为并行度,每次收到一个WorkerDoneEvent,workerDoneEventCounter就递增,当等于numberOfEventsUntilEndOfSuperstep,即并行度时候,就说明本次superstep中,全部headtask都成功了。
private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) { if (this.endOfSuperstep) { throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status."); } else { // 每次递增 ++this.workerDoneEventCounter; String[] aggNames = workerDoneEvent.getAggregatorNames(); Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader); if (aggNames.length != aggregates.length) { throw new RuntimeException("Inconsistent WorkerDoneEvent received!"); } else { for(int i = 0; i < aggNames.length; ++i) { Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]); aggregator.aggregate(aggregates[i]); } // numberOfEventsUntilEndOfSuperstep就是并行度,等于并行度时候就说明全部head都成功了。 if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) { this.endOfSuperstep = true; Thread.currentThread().interrupt(); } } } }
IterationSynchronizationSinkTask的例子以下:
"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"
综上所述,咱们最终获得superstep以下:
***** 文字描述以下 ***** 每次迭代都是一个superstep 每次迭代中有若干subtask在不一样的partition上分别执行step 每一个step有一个HeadTask,若干IntermediateTask,一个TailTask 每一个superstep有一个SynchronizationSinkTask ***** 伪代码大体以下 ***** for maxIter : begin superstep for maxSubTask : begin step IterationHeadTask IterationIntermediateTask IterationIntermediateTask ... IterationIntermediateTask IterationIntermediateTask IterationTailTask end step IterationSynchronizationSinkTask end superstep
K-means算法的过程,为了尽可能不用数学符号,因此描述的不是很严谨,大概就是这个意思,“物以类聚、人以群分”:
KMeansPreallocateCentroid也是superstep一员,可是只有context.getStepNo() == 1
的时候,才会进入实际业务逻辑,预分配Centroid。当superstep为大于1的时候,本task会执行,但不会进入具体业务代码。
public class KMeansPreallocateCentroid extends ComputeFunction { private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class); @Override public void calc(ComContext context) { // 每次superstep都会进到这里 LOG.info(" KMeansPreallocateCentroid 我每次都会进的呀 "); if (context.getStepNo() == 1) { // 实际预分配业务只进入一次 } } }
KMeansAssignCluster 做用是为每一个点(point)计算最近的聚类中心,为每一个聚类中心的点坐标的计数和求和。
KMeansUpdateCentroids 做用是基于计算出来的点计数和坐标,计算新的聚类中心。
Alink在整个计算过程当中维护一个特殊节点来记住待求中心点当前的结果。
这就是为啥迭代时候须要区分奇数次和偶数次的缘由了。奇数次就表示老大哥,偶数次就表示新大哥。每次superstep只会计算一批大哥,留下另一批大哥作距离比对。
另外要注意的一点是:普通的迭代计算,是经过Tail给Head回传用户数据,可是KMeans这里的实现并无采用这个办法,而是把计算出来的中心点都存在共享变量中,在各个intermediate之间互相交互。
public class KMeansAssignCluster extends ComputeFunction { public void calc(ComContext context) { ...... if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } /** 具体业务逻辑代码 * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster. */ } } public class KMeansUpdateCentroids extends ComputeFunction { public void calc(ComContext context) { if (context.getStepNo() % 2 == 0) { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2); } else { stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1); } /** 具体业务逻辑代码 * Update the centroids based on the sum of points and point number belonging to the same cluster. */ }
这里要特殊说明,由于KMeansOutputModel是最终输出模型,而KMeans算法的实现是:全部subtask都拥有全部中心点,就是说全部subtask都会有相同的模型,就没有必要所有输出,因此这里限定了第一个subtask才能输出,其余的都不输出。
@Override public List <Row> calc(ComContext context) { // 只有第一个subtask才输出模型数据。 if (context.getTaskId() != 0) { return null; } .... modelData.params = new KMeansTrainModelData.ParamSummary(); modelData.params.k = k; modelData.params.vectorColName = vectorColName; modelData.params.distanceType = distanceType; modelData.params.vectorSize = vectorSize; modelData.params.latitudeColName = latitudeColName; modelData.params.longtitudeColName = longtitudeColName; RowCollector collector = new RowCollector(); new KMeansModelDataConverter().save(modelData, collector); return collector.getRows(); }
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚类、K-Means、例子、细节
Flink-Gelly:Iterative Graph Processing