建立完成 MaxCompute Java Module后,便可以开始开发Graph了。html
在examples目录下有graph的一些代码示例,可参考示例熟悉Graph程序的结构。java
在module的源码目录即src>main >javanewMaxCompute Java。node
选择GraphLoader/Vertex等类型,NameOK**,模板会自动填充框架代码,可在此基础上继续修改。算法
Graph开发好后,下一步就是要测试本身的代码,看是否符合预期。咱们支持本地运行Graph,具体的:sql
**说明** 关于warehouse的详细介绍请参考[开发UDF](https://help.aliyun.com/document_detail/50902.html#warehous)中本地warehouse目录部分。
本地调试经过后,接下来就能够把Graph发布到服务端,在MaxCompute分布式环境下运行了:编程
首先,将本身的Graph程序打成jar包,并发布到服务端。如何打包发布?api
经过Studio无缝集成的MaxCompute Console(在Project ExplorerOpen in Console**),在Console命令行中输入相似以下的 jar命令:试用数组
-libjars xxx.jar -classpath /Users/home/xxx.jar com.aliyun.odps.graph.examples.PageRank pagerank_in pagerank_out;网络
更详细的Graph开发介绍请参见[编写Graph](https://help.aliyun.com/document_detail/27813.html#concept-gzg-1c2-vdb)。 <a name="Eclipse"></a> ## Eclipse 建立MaxCompute项目后,用户能够编写本身的Graph程序,参照下文步骤操做完成本地调试。<br />在此示例中,咱们选用插件提供的 PageRank.java来完成本地调试工做。选中 **examples**下的 PageRank.java文件,以下图。 <br /> [<br /><br />右键单击,选择 ****Debug As** >ODPS MapReduce|Graph****,以下图。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643220_zh-CN.png)<br /><br />单击后出现对话框,做以下配置。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643221_zh-CN.png) <br /><br />查看做业运行结果,以下图。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643222_zh-CN.png)<br /><br />能够查看在本地的计算结果,以下图。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643223_zh-CN.png)<br /><br />调试经过后,用户能够将程序打包,并以Jar资源的形式上传到MaxCompute,并提交Graph做业。 <a name="b54d1384"></a> # MaxCompute Graph的最佳实践 <a name="cc20fa1e"></a> ## 基于MaxCompute Graph实现用户聚类 <a name="95da745d"></a> ### 场景说明 在商品品牌预测中,提供了一份用户行为数据,以下: | 字段 | 字段说明 | 提取说明 | | --- | --- | --- | | user_id | 用户标识 | 抽样&字段加密 | | brand_id | 品牌ID | 抽了样&字段加密 | | type | 用户对品牌的行为类型 | 点击:0<br />购买:1<br />收藏:2<br />加入购物车:3 | | visit_datetime | 行为时间 | 格式某月某日,如7月6日, 隐藏年份 | 假设需求是但愿基于用户的购买行为对用户聚类。当用户浏览时,能够给TA推荐同一个聚类(兴趣度相近)的其余用户购买了什么。 <a name="094c47ac"></a> ### [](https://www.atatech.org/articles/32335#1)问题分析 在推荐领域,该问题属于基于用户的协同过滤范畴,它主要包括两个步骤:一是找到和目标用户兴趣度相近的用户集合;二是给目标用户推荐该集合中其余用户感兴趣(而目标用户没听过)的item。<br />对用户聚类即构建兴趣度相近的用户集合,常见的一种方式是经过Kmeans算法来实现。假定要把样本划分为k个类别,Kmeans算法的计算过程以下: * 选择k个初始中心节点; * 在每次迭代中,对每一个样本,计算其到中心节点的距离; * 更新中心节点 * 若是中心节点不变(或小于阈值),迭代结束;不然继续步骤2)、3)迭代 Kmeans算法的优点在于简洁快速,其关键在于初始中心节点的选择和距离公式。<br />在这个示例中,首先应该对数据进行预处理,构造用户的特征向量。出于简单,这里选择10个最hot的品牌(构造次数最多),基于用户对这10个品牌的购买次数,构造特征以下:<br />user_id, cnt1, …, cnt10,其中cnt表示对应品牌的购买次数。<br />而后经过Graph编程框架,经过KMeans算法实现聚类。 <a name="9195cc17"></a> ### [](https://www.atatech.org/articles/32335#2)数据准备 原始数据表为tmall_user_brand,数据准备主要包括生成特征和选择初始节点。 <a name="1c72fb00"></a> ### [](https://www.atatech.org/articles/32335#3)生成特征 生成特征包括以下步骤: 1\. 选择top 10 brands,生成表b 1\. 统计用户购买每一个品牌的次数,生成表t 1\. 对表b和t进行联接,统计用户购买top 10品牌的次数,生成表ub 假设ub表数据以下:
user_id brand_id count rank
a b1 5 1
a b3 2 3
a b4 3 4
b b3 1 3
b b7 9 7并发
<br />须要生成的特征表以下<br /><br />
user_id, cnt1, … , cnt10
a 5 0 2 3 0 0 0 0 0 0
b 0 0 1 0 0 0 9 0 0 0
<br />这里为了代码简短,经过SQL来“补”数据,经过sum(case when…)方式实现。<br />完整的SQL语句以下:<br />
create table t_user_feature as
select
user_id, sum(case when rank=1 then cnt else 0 end) as cnt1, sum(case when rank=2 then cnt else 0 end) as cnt2, sum(case when rank=3 then cnt else 0 end) as cnt3, sum(case when rank=4 then cnt else 0 end) as cnt4, sum(case when rank=5 then cnt else 0 end) as cnt5, sum(case when rank=6 then cnt else 0 end) as cnt6, sum(case when rank=7 then cnt else 0 end) as cnt7, sum(case when rank=8 then cnt else 0 end) as cnt8, sum(case when rank=9 then cnt else 0 end) as cnt9, sum(case when rank=10 then cnt else 0 end) as cnt10
from(
select /*+ MAPJOIN(b) */ t.user_id, t.brand_id, t.cnt, b.rank from( select user_id, brand_id, count(*) as cnt from tmall_user_brand where type='1' group by user_id, brand_id )t join( select brand_id, rank from( select brand_id, row_number() over (partition by 1 order by buy_cnt desc) as rank from( select brand_id, count(*) as buy_cnt from tmall_user_brand where type='1' group by brand_id )t1 )t2 where t2.rank <=10 )b on t.brand_id = b.brand_id
)ub
group by user_id;
alter table t_user_feature set lifecycle 7;
<a name="b28d718c"></a> ### 选择初始节点 对于Kmeans算法,初始节点的选取对聚类结果很重要,有不少paper研究如何选择初始节点。这里出于简单,直接随机选取3个节点,SQL以下:
drop table if exists t_kmeans_seed;
create table t_kmeans_seed as
select user_id,
cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10
from(
select user_id, cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10, cluster_sample(3) over (partition by 1) as flag from t_user_feature
)t1
where flag = true;
alter table t_kmeans_seed set lifecycle 7;
<a name="cecbfece"></a> ### [实现Kmeans聚类](https://www.atatech.org/articles/32335#5) 这里咱们基于在线手册Graph示例程序的“k-均值聚类算法”来实现。代码以下:
package example.demo;
public class KmeansDemo {
private final static Logger LOG = Logger.getLogger(KmeansDemo.class);
private static String RESOURCE_TABLE;
public static class KmeansVertex extends
Vertex<Text, Tuple, NullWritable, NullWritable> { @Override public void compute( ComputeContext<Text, Tuple, NullWritable, NullWritable> context, Iterable<NullWritable> messages) throws IOException { context.aggregate(this.getValue()); }
}
public static class KmeansVertexReader extends
GraphLoader<Text, Tuple, NullWritable, NullWritable> { @Override public void load(LongWritable recordNum, WritableRecord record, MutationContext<Text, Tuple, NullWritable, NullWritable> context) throws IOException { Tuple val = new Tuple(); for(int i=1; i<record.size(); ++i) { val.append(record.get(i)); } KmeansVertex vertex = new KmeansVertex(); vertex.setId(new Text(String.valueOf(record.get(0)))); vertex.setValue(val); context.addVertexRequest(vertex); }
}
public static class KmeansAggrValue implements Writable {
Tuple centers = new Tuple(); Tuple sums = new Tuple(); Tuple counts = new Tuple(); @Override public void write(DataOutput out) throws IOException { centers.write(out); sums.write(out); counts.write(out); } @Override public void readFields(DataInput in) throws IOException { centers = new Tuple(); centers.readFields(in); sums = new Tuple(); sums.readFields(in); counts = new Tuple(); counts.readFields(in); } @Override public String toString() { return "centers " + centers.toString() + ", sums " + sums.toString() + ", counts " + counts.toString(); }
}
public static class KmeansAggregator extends Aggregator {
@Override public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException { KmeansAggrValue aggrVal = null; aggrVal = new KmeansAggrValue(); aggrVal.centers = new Tuple(); aggrVal.sums = new Tuple(); aggrVal.counts = new Tuple(); RESOURCE_TABLE = context.getConfiguration().get("RESOURCE_TABLE"); Iterable<WritableRecord> iter = context.readResourceTable(RESOURCE_TABLE); for(WritableRecord record : iter) { Tuple center = new Tuple(); Tuple sum = new Tuple(); for (int i = 1; i < record.size(); ++i) { center.append(record.get(i)); sum.append(new LongWritable(0L)); } LongWritable count = new LongWritable(0L); aggrVal.sums.append(sum); aggrVal.counts.append(count); aggrVal.centers.append(center); } return aggrVal; } @Override public KmeansAggrValue createInitialValue(WorkerContext context) throws IOException { return (KmeansAggrValue) context.getLastAggregatedValue(0); } @Override public void aggregate(KmeansAggrValue value, Object item) { int min = 0; long mindist = Long.MAX_VALUE; Tuple point = (Tuple) item; for (int i = 0; i < value.centers.size(); i++) { Tuple center = (Tuple) value.centers.get(i); // use Euclidean Distance, no need to calculate sqrt long dist = 0L; for (int j = 0; j < center.size(); j++) { long v = ((LongWritable) point.get(j)).get() - ((LongWritable) center.get(j)).get(); dist += v * v; } if (dist < mindist) { mindist = dist; min = i; } } // update sum and count Tuple sum = (Tuple) value.sums.get(min); for (int i = 0; i < point.size(); i++) { LongWritable s = (LongWritable) sum.get(i); s.set(s.get() + ((LongWritable) point.get(i)).get()); } LongWritable count = (LongWritable) value.counts.get(min); count.set(count.get() + 1L); } @Override public void merge(KmeansAggrValue value, KmeansAggrValue partial) { for (int i = 0; i < value.sums.size(); i++) { Tuple sum = (Tuple) value.sums.get(i); Tuple that = (Tuple) partial.sums.get(i); for (int j = 0; j < sum.size(); j++) { LongWritable s = (LongWritable) sum.get(j); s.set(s.get() + ((LongWritable) that.get(j)).get()); } } for (int i = 0; i < value.counts.size(); i++) { LongWritable count = (LongWritable) value.counts.get(i); count.set(count.get() + ((LongWritable) partial.counts.get(i)).get()); } } @SuppressWarnings("rawtypes") @Override public boolean terminate(WorkerContext context, KmeansAggrValue value) throws IOException { // compute new centers Tuple newCenters = new Tuple(value.sums.size()); for (int i = 0; i < value.sums.size(); i++) { Tuple sum = (Tuple) value.sums.get(i); Tuple newCenter = new Tuple(sum.size()); LongWritable c = (LongWritable) value.counts.get(i); if(c.equals(0L)) { continue; } for (int j = 0; j < sum.size(); j++) { LongWritable s = (LongWritable) sum.get(j); newCenter.set(j, new LongWritable(new Double((double)s.get()/ c.get()+0.5).longValue())); // reset sum for next iteration s.set(0L); } // reset count for next iteration c.set(0L); newCenters.set(i, newCenter); } // update centers Tuple oldCenters = value.centers; value.centers = newCenters; LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters); // compare new/old centers boolean converged = true; for (int i = 0; i < value.centers.size() && converged; i++) { Tuple oldCenter = (Tuple) oldCenters.get(i); Tuple newCenter = (Tuple) newCenters.get(i); long sum = 0L; for (int j = 0; j < newCenter.size(); j++) { long v = ((LongWritable) newCenter.get(j)).get() - ((LongWritable) oldCenter.get(j)).get(); sum += v * v; } double dist = Math.sqrt(sum); LOG.info("old center: " + oldCenter + ", new center: " + newCenter + ", dist: " + dist); // converge threshold for each center: 0.05 converged = dist < 0.05d; } if (converged || context.getSuperstep() == context.getMaxIteration() - 1) { // converged or reach max iteration, output centers for (int i = 0; i < value.centers.size(); i++) { context.write(((Tuple) value.centers.get(i)).toArray()); } // true means to terminate iteration return true; } // false means to continue iteration return false; }
}
private static void printUsage() {
System.out.println("Usage: <in> <out> <resource> [Max iterations (default 30)]"); System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 3) printUsage(); GraphJob job = new GraphJob(); job.setGraphLoaderClass(KmeansVertexReader.class); job.setRuntimePartitioning(false); job.setVertexClass(KmeansVertex.class); job.setAggregatorClass(KmeansAggregator.class); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); job.set("RESOURCE_TABLE", args[2]); // default max iteration is 30 job.setMaxIteration(30); if (args.length >= 4) job.setMaxIteration(Integer.parseInt(args[3])); long start = System.currentTimeMillis(); job.run(); System.out.println("Job Finished in " + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
}
}
<br />和MapReduce编程框架相似,在main函数,先实例化一个GraphJob,对job设置后,经过job.run()提交。<br />KmeansVertexReader类实现加载图,定义图节点。因为kmeans算法是计算节点距离,所以不须要定义边;此外它须要对迭代结果进行汇总,因此经过KmeansAggregator继承Aggregator,实现每一步迭代计算。<br /><br /> <a name="f1e68d34"></a> ### [](https://www.atatech.org/articles/32335#6)运行和输出 准备结果表SQL以下:
create table t_kmeans_result(
cnt1 bigint, cnt2 bigint, cnt3 bigint, cnt4 bigint, cnt5 bigint, cnt6 bigint, cnt7 bigint, cnt8 bigint, cnt9 bigint, cnt10 bigint) lifecycle 7;
<br />在console中执行以下命令:<br /><br />
add jar /home/admin/duckrun/dev/open_graph_example/target/open_graph_example-0.1.jar -f;
add table t_kmeans_seed -f;
jar -resources open_graph_example-0.1.jar,t_kmeans_seed -classpath /home/admin/duckrun/dev/open_graph_examp
<a name="d41d8cd9"></a> ## <a name="388ac3a4"></a> ## 基于MaxCompute Graph实现并行化层次聚类 <a name="8e1b944f"></a> ### 背景 图聚类是常见的一种聚类场景。和基于向量的聚类不一样,图的每一个节点只和有限个节点有距离,没法定义任意两点之间的距离。所以,像k-means这类常规方法就不适合图聚类。本文要介绍的是用层次聚类(hierarchical clustering)的方法作图聚类,其中为简单起见,图是无向的。 <a name="90179ba4"></a> ### [](https://www.atatech.org/articles/25067#1)聚类过程 标准的自底向上的层次聚类过程是这样的:每次选取距离最小的两个点merge,直到最后只剩一个点(包含全部的原始点)为止。聚类过程涉及到点和点,以及簇和簇之间距离计算的不一样方法;具体的能够参考[维基百科的解释](http://en.wikipedia.org/wiki/Hierarchical_clustering)。<br />基于无向图的层次聚类和标准层次聚类是相似的,用边的权值来度量节点之间的距离,同时更新合并节点的邻居节点之间的边。用伪代码描述过程以下:
图加载;
While(不知足聚类中止条件) {
选取距离最小的边edgeAB;
生产新的节点AB;
生产新的边,AB和A,B的全部邻居之间;
删除A和A邻居之间的边,删除B和B邻居之间的边;
删除A,删除B;
}
<a name="85901219"></a> ### [](https://www.atatech.org/articles/25067#2)MaxCompute Graph实现细节 层次聚类实现的核心是经过Vertex的compute来实现的。定义Vertex的执行状态,分别包括:选举状态(minedge_electing);等待选举结果状态(waiting_election);中止状态(waiting_delete)。
Vertex.compute() {
switch(current_state) {
case minedge_electing:
if(存在邻居节点)
选取和邻居节点之间最小的边,发送给aggregator;
else
voteToHalt(); //没有邻居,中止计算退出;
break;
case waiting_election:
从aggregator获取全局选取的最小边minEdge; if(minEdge的距离值>阀值距离) voteToHalt(); //没有能够再作聚合的簇了,中止计算并退出; else if(minEdge不是本节点和某个邻居节点之间的边) 转换状态到minedge_electing,准备下一轮选举迭代;
else {
//假设本节点为A, minEdge对应的邻居为B
addVertexRequest(AB); //mergeA和B新生产节点
for(Vertex neighbor: A’s neighbors) {
removeEdgeRequest(A->neighbor); removeEdgeRequest(neighbor->A); if(neighbor不是B) { addEdgeRequest(AB->neighbor); addEdgeRequest(neighbor->AB);
}
}
removeVertexRequest(A);
转换状态到waiting_delete;
}
break;
case waiting_delete:
voteToHalt();
break;
}
}
全局Aggregator定义:两两比较边的距离值,选取最小的那个;<br />节点冲突Resolver定义:当A节点发现minEdge是edge(A B)的同时,B也一样发现,其处理流程和A是对称相等的,所以会出现冲突(重复增长新节点AB,重复增长和删除边edge(AB, C),当C和A,B都有链接的时候)。以下图所示:C节点是A,B的共同邻居,所以A,B合并为新的节点AB后,针对C节点就须要特别处理冲突的状况;而D,E的处理就相对简单。<br /> <a name="dd76d17c"></a> ### [](https://www.atatech.org/articles/25067#3)并行近似优化 上述的聚类流程中,真正并行化执行只是在选举最短距离的过程(单机版须要扫描全部的边,graph分布式由节点把相邻的最短距离report to aggregator),而merge仅仅只有两个节点参与。因为graph框架自己的耗费,实际测试发现程序执行速度并不理想。<br />既然在节点merge的过程没有并行化,那么就思考是否在这块能够作并行化处理,答案是确定的。例以下图中,边edgeAB能够merge的同时,是否能够考虑把edgeGH也merge。<br /><br />从图上看出edgeAB和edgeGH之间路径相对比较远,同步merge G,H对全局结果的影响不大,按照标准的全局选举流程,最终也会选择G,H来merge。固然,理论上来讲,有可能因为A,B合并了之后,致使和周围节点边更新,从而影响了后续的全局选举结果。所以,并行化的merge节点最终是一个近似的结果。为了保证近似结果的可靠性,第一在于同时可merge节点之间的路径要足够的远,相互影响的可能性就小。考虑一个极端的状况,就是路径无穷大,实质是不连通的状况,那么同时merge就彻底没有风险了。第二,必须保证节点merge之后,生成新边的权重要合理,以保证并行化merge顺序和非并行化merge顺序近似一致,有关这一点后续会细说。<br />修改选举最短距离边的实现,不用全局选举的结果,而是在必定路径范围内选举出最短距离边,而后merge,这样就同时会选举出多个局部最短距离边。可同步merge的边必须知足一个最短路径阀值,以下图所示:edgeAB和edgeDE是能够同步merge的,不会起冲突,由于对节点C而言,分别增长了两个新的节点;若是edgeAB和edgeCD同步merge,那就会起冲突,由于两个新生产的节点之间也须要产生邻居关系。所以,必须保证同步merge的边之间至少存在一个不变化的节点,这样就避免了新节点之间的邻居关系生成。<br /><br />在局部选举的过程当中,依然采用的是节点report本身所知道的最短距离,只是将report给aggregator,改成report给邻居,而且经过屡次迭代实现传播功能。局部选举的伪代码以下:
Step1:
选取和邻居节点之间距离最小的边,发送给全部邻居节点以及本节点;进入step2;
Step2:
从接受的消息中选取距离最小的边(包括了在step1中邻居以及本节点选取的结果),发送给全部邻居节点以及本节点;进入step3;
Step3:
从接受的消息中选取距离最小的边(包括了在step2中邻居以及本节点选取的结果),发送给全部邻居节点以及本节点;进入step4;
……….
StepN:
从接受的消息中选取距离最小的边(包括了在stepN-1中邻居以及本节点选取的结果),若是minEdge是本节点的一条边,那么就进行merge,不然进入step1;
事实上,每一step就是不断地选举局部最短距离边,而且把这个信息逐层扩散,这样就确保了在必定的路径范围内永远只选举一个最短距离边。N的设置能够配置,显然,N越小,并行化程度就越高。固然,必须避免冲突,所以N的最小取值为3。 <a name="ce23477e"></a> ### [](https://www.atatech.org/articles/25067#4)边权重更新 层次聚类过程当中,簇和簇之间距离的计算能够参考[维基百科](http://en.wikipedia.org/wiki/Hierarchical_clustering) 提到的各类方法。本文参考的是[Ward方法](http://en.wikipedia.org/wiki/Ward%27s_method) 来计算节点merge之后和邻居节点之间的边权重。另外要说明的是有关边距离的度量,因为本文提出的方法是针对淘宝商品[interest entity node](http://dthink.alibaba-inc.com/articles/commonalg/interestgraph.htm)聚类的实现,而输入是node和node之间协同类似度(看了又看,买了又买);所以节点之间的距离度量是和类似度成反比的。类似度越大,等同于距离就越小。为简单起见,就直接用类似度做为距离的度量。每次选举局部距离最小的节点对,便是选举类似度最大的节点对。<br />基于Ward的思想,把要merge的两个簇的节点数量做为衡量的标准,同时考虑到下降边权重减弱的速度,最终用如下的方法作更新:<br />假设要merge的两个节点分别为A和B,节点nA,nB分别是A和B的邻居;<br />nA,nB和新节点AB的类似度计算:
sim(nA, AB)=sim(A, nA) * alphaA;
sim(nB, AB)=sim(B, nB) * alphaB;
当size(A) + size(B)=2的时候,alphaA=alphaB=0.9;
不然alphaA=sqrt(sizeA) / sqrt(sizeA + sizeB), alphaB=sqrt(sizeB) / sqrt(sizeA + sizeB)。
当nA和nB为同一个节点的时候,也即A,B共同邻居,和新节点AB的类似度最终合并为:(sim(A, nA) + sim(B, nB)) * 0.618。
<a name="d41d8cd9-1"></a> ### [](https://www.atatech.org/articles/25067#5) <a name="992bf7f5"></a> ## 基于MaxCompute Graph实现大规模网络的关系扩散 关系数据相关的实体有天然人、企业、媒介、帐号等,如何对由亿级别的节点和边组成的大规模网络进行有效的图计算是一个刚性需求。 <a name="2fe57705"></a> ### [](https://www.atatech.org/articles/104874#0)问题抽象 若是有一个亿级别的大规模有向网络(就假设为微博的用户关注关系网络好了,便于理解),如何进行关系扩散找到用户可能想关注的其余用户呢?打个比方,A用户关注了B,B又关注了C,那么可能C就是A想要关注的潜在用户,如今咱们要作的事就是把全部的C找出来推荐给A,最好还要把A到C的关注链路也一并输出,便于其余深刻的分析。咱们的目标定为四度关系扩散,A—>B—>C—>D—>E,找到E。 <a name="01ea6c5e"></a> ### [](https://www.atatech.org/articles/104874#1)暴力解法 最直接的想法就是对已有的一度关系表进行一次join获得两度关系,进行两次join获得三度关系,依次类推。假设网络是均匀分布的,每一个人关注的人数量级差很少,利用MaxCompute强大的计算能力,这种方法还有可能会计算出结果。然而现实的网络结构每每会存在小部分的出边和入边远大于平均水平的超点(微博大V),这些点在join的过程当中极易形成数据倾斜,一次join还能勉勉强强接受,但两次三次join最后99.9%会以计算失败了结。<br />那么利用MaxCompute Graph的sendMessage机制可否解决这个问题呢?在每一迭代步里,每一个节点都将自身的节点值添加到上游节点传来的路径后面,再将路径当作message传递给下游节点,以下图所示,计算过程以下:<br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/98ed95e8939df0408db2fee4eb23c0b9.png)<br />第一步:每一个节点的value设置为自身的id,并将value发送全部出边的终点;<br />第二步:每一个节点将收到的全部消息存储为一个list,将自身id添加到list里面的每一个元素后面,再将这个list发送给下游节点;<br />第三~五步:重复第二步。第五步输出的长度为5的路径便是咱们想要的结果。<br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/644dbbfd4438f4ada3e51e1a1b91d147.png)<br />但最终实践证实,这种方法在到第二步之后就会内存不够报错,尽管已经将各项参数调到最大,仍是不行。主要缘由是发送消息采用数组的形式太占内存,每一步都将自身节点添加到全部路径后面也会有重复存储的问题,看来还有不少优化的空间。 <a name="ac500c12"></a> ### [](https://www.atatech.org/articles/104874#2)两度关系 咱们先从最简单的两度关系入手,因为MaxCompute Graph是以点为粒度进行输出的,所以咱们只需找到每一个节点的头和尾,至关于把两度路径的中间节点固定住,遍历头部节点和尾部节点,就能够输出全部的两度路径了。实现很简单,首先定义一个MyValue的class存储全部的上游节点值和下游节点值以及自身节点值:
public static class MyValue implements Writable {
private Tuple downVertex; //下游节点 private Text selfId; //自身节点 private Tuple upVertex; //上游节点 public MyValue() { downVertex = new Tuple(); selfId = new Text(); upVertex = new Tuple(); } public MyValue(Text id) { downVertex = new Tuple(); selfId = new Text(id); upVertex = new Tuple(); } public void setSelfId(Text id) { selfId = id; } public void setDownVertex(Tuple value) { downVertex = value; } public void setUpVertex(Tuple value) { upVertex = value; } public Tuple getDownVertex() { return downVertex; } public Text getSelfId() { return selfId; } public Tuple getUpVertex() { return upVertex; } public void addDownVertex(Writable value) { downVertex.append(value); } public void addUpVertex(Writable value) { upVertex.append(value); } @Override public void write(DataOutput out) throws IOException { upVertex.write(out); selfId.write(out); downVertex.write(out); } @Override public void readFields(DataInput in) throws IOException { upVertex.readFields(in); selfId.readFields(in); downVertex.readFields(in); }
}
而后进行简单的5步迭代便可获得结果:
switch ((int) context.getSuperstep()) {
case 0: //设置自身节点值 getValue().setSelfId(getId()); break; case 1: //发送自身id给下游节点 if (hasEdges()) { context.sendMessageToNeighbors(this, new MyValue(getId())); } break; case 2: //存储收到的消息,存储为上游节点列表 for (MyValue msg : messages) { getValue().addUpVertex(msg.getSelfId()); } break; case 3: //发送自身id给上游节点 for (Writable id : getValue().getUpVertex().getAll()) { context.sendMessage((Text) id, new MyValue(getId())); } break; case 4: //存储收到的消息,存储为下游节点列表 for (MyValue msg : messages) { getValue().addDownVertex(msg.getSelfId()); } break;
}
最后将结果输出便可:
@Override
public void cleanup(WorkerContext context)
throws IOException { context.write(new Text(getValue().getUpVertex().toDelimitedString(',')), getId(), new Text(getValue().getDownVertex().toDelimitedString(',')));
}
输出结果的第一列和第三列均为数组,第二列为当前的节点,利用trans_array函数便可将数组转换为多行。这里有个坑须要注意,sql不能写成下面的形式:
select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3)
from
(
select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1) from result_table
)t1
由于这样会把那些出边和入边很是多的节点同时解析两列trans_array的工做量分配到一个mapper上,形成严重的数据倾斜,写成下面的形式便可进行两次的资源分配,极大地下降数据倾斜的程度。
drop table if exists result_table_left;
create table result_table_left lifecycle 7 as
select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1)
from result_table;
drop table if exists result_table_right;
create table result_table_right lifecycle 7 as
select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3)
from result_table_left;
<a name="b1b13f88"></a> ### [](https://www.atatech.org/articles/104874#3)三度关系 既然两度关系能够利用MaxCompute Graph的特性固定住中间的节点,那么天然地,咱们能够想到,三度关系能够固定住中间的两个节点,变成以关系对的粒度(也就是边的粒度)进行路径头和尾的遍历。可是Graph的输出是以点为粒度,想要实现边的粒度还须要再多发送一次消息,以下所示:
switch ((int) context.getSuperstep()) {
case 0: //设置自身节点值 getValue().setSelfId(getId()); break; case 1: //发送自身id给下游节点 if (hasEdges()) { context.sendMessageToNeighbors(this, new MyValue(getId())); } break; case 2: //存储收到的消息,存储为上游节点列表 for (MyValue msg : messages) { getValue().addUpVertex(msg.getSelfId()); } break; case 3: //发送自身id给上游节点 for (Writable id : getValue().getUpVertex().getAll()) { context.sendMessage((Text) id, new MyValue(getId())); } break; case 4: //存储收到的消息,存储为下游节点列表 for (MyValue msg : messages) { getValue().addDownVertex(msg.getSelfId()); } break; case 5: //再将下游节点的值发送给上游 for (Writable id : getValue().getUpVertex().getAll()) { context.sendMessage((Text) id, getValue()); } break; case 6: //结果输出 [上游节点列表,本节点,当前消息所属的下游节点,下游节点的下游节点列表] for (MyValue msg : messages) { context.write(new Text(getValue().getUpVertex().toDelimitedString(',')), getId(), msg.getSelfId(), new Text(msg.getDownVertex().toDelimitedString(','))); } break;
}
最后再像二度关系里面用两次trans_array解析便可获得全部的三度关系路径了。 <a name="3ad2b9ec"></a> ### [](https://www.atatech.org/articles/104874#4)四度关系 一样按照以前的思路,四度关系至关于固定住中间的三个节点再进行头部节点和尾部节点的遍历。那么问题来了,固定住一个节点能够看作以点为粒度进行遍历,固定住两个节点能够看作是以边为粒度进行遍历,那么固定住三个节点至关于什么呢?问题好像不可解了。可是咱们能够换个思路来看,若是咱们把固定住三个节点转换为固定住两个节点呢?以下图所示,咱们已经经过两度关系的输出获得全部的三个节点的路径,如A和C,那么咱们在A和C上新加一条边,将边的值设置为中间节点B的节点值,这样就能够变成两个节点了!而原来的边还保留,只是边的值为空。<br /><br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/10e374d629e8d1fb718255226111abf8.png)<br />所以,咱们从新用一度关系边和两度关系的虚拟边构建一个新的网络,再在新的网络上运用三度关系的迭代方法。注意,添加虚拟边后会让节点的上下游节点列表变大,所以,前四步构建上下游节点列表时需加一条判断边的值为空的条件,而后第五步和第六步输出路径时须要判断边不为空。
switch ((int) context.getSuperstep()) {
case 0: //设置自身节点值 getValue().setSelfId(getId()); break; case 1: //发送自身id给下游节点 if (hasEdges()) { for (Edge<Text, Text> e : getEdges()) { if (e.getValue().equals(new Text(""))) { context.sendMessage(e.getDestVertexId(), new MyValue(getId())); } } } break; case 2: //存储收到的消息,存储为上游节点列表 for (MyValue msg : messages) { getValue().addUpVertex(msg.getSelfId()); } break; case 3: //发送自身id给上游节点 for (Writable id : getValue().getUpVertex().getAll()) { context.sendMessage((Text) id, new MyValue(getId())); } break; case 4: //存储收到的消息,存储为下游节点列表 for (MyValue msg : messages) { getValue().addDownVertex(msg.getSelfId()); } break; case 5: //再将本节点的值和边值发送给下游 if (hasEdges()) { MyValue msg = new MyValue(); msg.setDownVertex(getValue().getDownVertex()); msg.setUpVertex(getValue().getUpVertex()); for (Edge<Text, Text> e : getEdges()) { if (!e.getValue().equals(new Text(""))) { String id = getId().toString(); String edge = e.getValue().toString(); msg.setSelfId(new Text(id+"+"+edge); context.sendMessage(e.getDestVertexId(), msg); } } } break; case 6: //结果输出 [上游的上游节点列表,上游节点+中间节点,本节点,本节点的下游节点列表] for (MyValue msg : messages) { context.write(new Text(msg.getUpVertex().toDelimitedString(',')) msg.getSelfId(), getId(), new Text(getValue().getDownVertex().toDelimitedString(','))); } break;
}
最后再用两次trans_array和split解析便可获得全部的四度关系路径了。 <a name="5a3530b9"></a> ### [](https://www.atatech.org/articles/104874#5)环路截断 前面的讨论没有考虑环路的状况,实际中环路是很常见的,好比两我的互相关注。有环路时,输出的路径须要截断。<br />两度关系输出为3个节点,只需判断头尾不相同便可,头尾相同将头节点置为空,退化为一度关系。好比A—>B—>A截断为B—>A;<br />三度关系输出为4个节点,中间两个节点确定不相同,判断第一个节点是否和第3、第四个节点相同,相同将第一个节点截断,再判断第二个节点和第四个节点是否相同,相同的话在第二个节点处截断,即同时将第一个和第二个节点置为空。如A—>B—>C—>A截断为B—>C—>A,B—>A—>C—>A截断为C—>A;<br />四度关系同理,再也不赘述。 <a name="433531fd"></a> ### [](https://www.atatech.org/articles/104874#6)结语 本例实践了一张有1亿节点,2亿边的有向图,对其进行了关系扩散,最终的结果两度关系有221亿,三度关系有2180亿,四度关系已经上万亿了,计算耗时两度关系40分钟,三度关系90分钟左右,四度及以上整个过程的瓶颈已经不在计算了,而在MaxCompute Graph输出上,输出的耗时基本以小时为单位计算。
本文做者:云花
本文为云栖社区原创内容,未经容许不得转载