GraphX内置了许多函数实现图运算操做。ide
mapVertices
的功能是transform each vertex attribute in the graph using the map function。即对已有图的顶点属性作转换。函数
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
其中,VD2
为转换后的顶点属性类型,map
定义了转换函数ui
myGraph.vertices.collect().foreach(println) (4,d) (1,a) (5,e) (2,b) (3,c) myGraph.mapVertices[Int]((vertexId, _) => if (vertexId < 2L) 0 else 1).vertices.collect().foreach(println) (4,1) (1,0) (5,1) (2,1) (3,1)
mapEdges
对已有图的边属性作转换,transforms each edge attribute in the graph using the map function。scala
// (1) def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = { mapEdges((pid, iter) => iter.map(map)) } // (2) def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]) : Graph[VD, ED2]
ED2
为转换后的边属性类型,Edge[ED]
只包含边的属性值和与边相连的顶点的VertexId,不包含顶点的属性值。第一个mapEdges
在内部调用了第二个mapEdges,第二个mapEdges中的map函数,以一个分区的全部Edge做为输入,对边的属性进行转换。code
myGraph.edges.collect().foreach(println) Edge(1,2,is-friends-with) Edge(2,3,is-friends-with) Edge(3,4,is-friends-with) Edge(3,5,Wrote-status) Edge(4,5,Likes-status) myGraph.mapEdges(e => if (e.attr == "is-friends-with") 0 else 1).edges.collect().foreach(println) Edge(1,2,0) Edge(2,3,0) Edge(3,4,0) Edge(3,5,1) Edge(4,5,1)
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map), TripletFields.All) } def mapTriplets[ED2: ClassTag]( map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map), tripletFields) }
关于mapTriplets
的功能,官方的描述是Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider using mapEdges
instead。mapTriplets一样是对已有图的边属性进行转换,只不过EdgeTriplet
包含与边相邻的顶点的属性值,而Edge
不包含。orm
myGraph.mapTriplets(et => if (et.attr == "is-friends-with" && et.srcAttr == "a") 0 else 1).edges.collect().foreach(println) Edge(1,2,0) Edge(2,3,1) Edge(3,4,1) Edge(3,5,1) Edge(4,5,1)
值得一提的是,mapVertices
的map函数是以单个顶点做为输入,而mapEdges
和mapTriplets
的map函数是以一个分区的全部边做为输入,这是由于GraphX使用点切分的方式存储图。ip
有时候,咱们但愿将额外的属性合并到一个图的顶点属性中去,可使用joinVertice
操做。ci
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = { val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data } } graph.outerJoinVertices(table)(uf) }
其中,table
为额外的属性,mapFunc
定义了如何将额外的属性和顶点的已有属性进行合并。使用joinVertice
会返回一个新的带有顶点属性的图。在执行mapFunc时,顶点的VertexId、顶点的属性值会与该顶点对应的额外属性进行匹配,由case None => data
可知,若是一个顶点没有对应的额外属性,则会保留该顶点的原有属性值。get
已有图的顶点属性为顶点名称(a,b,c,...),需求是将地点属性修改成顶点的“出度”。源码
myGraph.joinVertices(outDegrees)((_,_, d) => d.toString).vertices.collect().foreach(println) (4,1) (1,1) (5,e) (2,1) (3,2)
发现,VertexId为1,2,3,4的顶点都正确实现了需求,但VertexId为5的顶点,没有对应的额外属性值(顶点5的出度为0,在outDegrees中没有记录),于是顶点5的属性值没有改变,若是想要实现将出度为0的顶点的属性值修改成0,有两种方法,
outerJoinVertices
操做。从joinVertices的源码能够发现,joinVertices是outerJoinVertices
的一个特例。
已有图的顶点属性为顶点名称(a,b,c,...),需求是将地点属性修改成顶点的“出度”。
// 方法1 val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _) myGraph.outerJoinVertices(outDegrees)((_,_, d) => d).vertices.collect().foreach(println) (4,Some(1)) (1,Some(1)) (5,None) (2,Some(1)) (3,Some(2)) //方法2 val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _) myGraph.outerJoinVertices(outDegrees)((_,_, d) => d.getOrElse(0)).vertices.collect().foreach(println) (4,1) (1,1) (5,0) (2,1) (3,2)
能够发现,调用outerJoinVertices,顶点在匹配其额外属性时,会将其匹配的额外属性转换为Option
类型,所以能够经过getOrElse
方法指定默认值的方式,为出度为0且没有记录在outDegrees中的顶点设置正确的额外属性值。
aggregateMessages的主要功能是向邻边发消息,而后合并邻边收到的消息。
def aggregateMessages[A: ClassTag]( sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] = { aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None) }
其中,A
为消息的类型,aggregateMessages的返回值为VertexRDD[A]
sendMsg函数用来发送消息,它以EdgeContext
做为参数,没有返回值。EdgeContext
与EdgeTriplet
有些相似,成员变量都包含:srcId、dstId、srcAttr、dstAttr和attr。可是EdgeContext抽象类提升了两个发送消息的方法:sendToSrc
和sendToDst
,sendToSrc将类型为A的信息发送给源顶点,sendToDst将类型为A 的信息发送给目标顶点,toEdgeTriplet
方法实现了从EdgeContext到EdgeTriplet的转换。
/** * Represents an edge along with its neighboring vertices and allows sending messages along the * edge. Used in [[Graph#aggregateMessages]]. */ abstract class EdgeContext[VD, ED, A] { /** The vertex id of the edge's source vertex. */ def srcId: VertexId /** The vertex id of the edge's destination vertex. */ def dstId: VertexId /** The vertex attribute of the edge's source vertex. */ def srcAttr: VD /** The vertex attribute of the edge's destination vertex. */ def dstAttr: VD /** The attribute associated with the edge. */ def attr: ED /** Sends a message to the source vertex. */ def sendToSrc(msg: A): Unit /** Sends a message to the destination vertex. */ def sendToDst(msg: A): Unit /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */ def toEdgeTriplet: EdgeTriplet[VD, ED] = { val et = new EdgeTriplet[VD, ED] et.srcId = srcId et.srcAttr = srcAttr et.dstId = dstId et.dstAttr = dstAttr et.attr = attr et } }12
mergeMsg函数用来合并消息。每一个顶点收到的全部消息都会被汇集起来传递给mergeMsg函数,mergeMsg对其合并获得最终结果。
统计图中顶点的“出度”。
myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect().foreach(println) (4,1) (1,1) (2,1) (3,2)