GraphX构建图的方式很简单,分为3步:app
val myVertices: RDD[(Long, String)] = spark.sparkContext.makeRDD(Array((1L, "a"), (2L, "b"), (3L, "c"), (4L, "d"), (5L, "e"))) val myEdges: RDD[Edge[String]] = spark.sparkContext.makeRDD(Array(Edge(1L, 2L, "is-friends-with"), Edge(2L, 3L, "is-friends-with"), Edge(3L, 4L, "is-friends-with"), Edge(4L, 5L, "Likes-status"), Edge(3L, 5L, "Wrote-status"))) val myGraph: Graph[String, String] = Graph(myVertices, myEdges)
Graph
伴生对象中定义了apply
方法,所以代码Graph(myVertices, myEdges)其实是调用Graph.apply()
,源码以下,this
def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD], edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = { GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) }
在GraphX中,对于一个构建好的图,调用vertices
,能够返回图的顶点集VertexRDD
,spa
myGraph.vertices.collect().foreach(println) (4,d) (1,a) (5,e) (2,b) (3,c)
VertexRDD
源码以下,scala
abstract class VertexRDD[VD]( sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
VertexRDD[VD]
继承自RDD[(VertexId, VD)]
,其中VertexId
表示顶点Id,GraphX将VertexId定义为64位的Long类型(type VertexId = Long
),VD
则表示顶点属性的类型。3d
在GraphX中,对于一个构建好的图,调用edges
,能够返回图的边集EdgeRDD
,code
myGraph.edges.collect().foreach(println) Edge(1,2,is-friends-with) Edge(2,3,is-friends-with) Edge(3,4,is-friends-with) Edge(3,5,Wrote-status) Edge(4,5,Likes-status)
EdgeRDD
源码以下,对象
abstract class EdgeRDD[ED]( sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
EdgeRDD[ED]
继承自RDD[Edge[ED]]
,其中ED
表示边的属性类型,Edge[ED]
的源码以下,它用来表示一条包含源顶点、目标顶点和边属性的有向边。blog
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable
其中,ED
为边属性的类型,srcId
表示源顶点的VertexId,dstId
表示目的顶点的VertexId,attr
为边的属性值。继承
对于一个构建好的图,调用triplets
返回RDD[EdgeTriplet[VD, ED]]
,RDD的类型为EdgeTriplet[VD, ED],ip
myGraph.triplets.collect().foreach(println) ((1,a),(2,b),is-friends-with) ((2,b),(3,c),is-friends-with) ((3,c),(4,d),is-friends-with) ((3,c),(5,e),Wrote-status) ((4,d),(5,e),Likes-status)
EdgeTriplet
源码以下,
class EdgeTriplet[VD, ED] extends Edge[ED] { /** * The source vertex attribute */ var srcAttr: VD = _ // nullValue[VD] /** * The destination vertex attribute */ var dstAttr: VD = _ // nullValue[VD] /** * Set the edge properties of this triplet. */ protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = { srcId = other.srcId dstId = other.dstId attr = other.attr this } ... }
EdgeTriplet[VD, ED]
继承自Edge[ED]
,其中,VD
为顶点的属性类型,ED
为边的属性类型。此外还增长了两个成员变量srcAttr
和dstAttr
,分别为源顶点和目标顶点的属性值。
GraphX中Graph
类及其依赖的UML图以下,