Spark图处理GraphX学习笔记!

Spark图处理GraphX学习笔记!web


1、什么是GraphX?算法

Graphx利用了Spark这样了一个并行处理框架来实现了图上的一些可并行化执行的算法sql


  • 算法是否可以并行化与Spark自己无关数据结构

  • 算法并行化与否的自己,须要经过数学来证实框架

  • 已经证实的可并行化算法,利用Spark来实现会是一个错的选择,由于Graphx支持pregel的图计算模型dom


2、Graphx包含哪些组件和基本框架?ide


一、成员变量

graph中重要的成员变量分别为函数

  1. verticespost

  2. edges学习

  3. triplets

为何要引入triplets呢,主要是和Pregel这个计算模型相关,在triplets中,同时记录着edge和vertex. 具体代码就不罗列了。

二、成员函数

函数分红几大类

  1. 对全部顶点或边的操做,但不改变图结构自己,如mapEdges, mapVertices

  2. 子图,相似于集合操做中的filter subGraph

  3. 图的分割,即paritition操做,这个对于Spark计算来讲,很关键,正是由于有了不一样的Partition,才有了并行处理的可能, 不一样的PartitionStrategy,其收益不一样。最容易想到的就是利用Hash来将整个图分红多个区域。

  4. outerJoinVertices 顶点的外链接操做


3、图的运算和操做 GraphOps

图的经常使用算法是集中抽象到GraphOps这个类中,在Graph里做了隐式转换,将Graph转换为GraphOps,具体有下列12个算子:

  1. collectNeighborIds

  2. collectNeighbors

  3. collectEdges

  4. joinVertices

  5. filter

  6. pickRandomVertex

  7. pregel

  8. pageRank

  9. staticPageRank

  10. connectedComponents

  11. triangleCount

  12. stronglyConnectedComponents


RDD

RDD是Spark体系的核心,那么Graphx中引入了哪些新的RDD呢,有俩,分别为

  1. VertexRDD

  2. EdgeRDD

较之EdgeRdd,VertexRDD更为重要,其上的操做也不少,主要集中于Vertex之上属性的合并,说到合并就不得不扯到关系代数和集合论,因此在VertexRdd中能看到许多相似于sql中的术语,如

  • leftJoin

  • innerJoin


4、GraphX场景分析


一、图的存储和加载

在进行数学计算的时候,图用线性代数中的矩阵来表示,那么如何进行存储呢?

数据结构的时候,老师确定说过好多的办法,再也不啰嗦了。

不过在大数据的环境下,若是图很巨大,表示顶点和边的数据不足以放在一个文件中怎么办? 用HDFS

加载的时候,一台机器的内存不足以容下怎么办? 延迟加载,在真正须要数据时,将数据分发到不一样机器中,采用级联方式。

通常来讲,咱们会将全部与顶点相关的内容保存在一个文件中vertexFile,全部与边相关的信息保存在另外一个文件中edgeFile。

生成某一个具体的图时,用edge就能够表示图中顶点的关联关系,同时图的结构也表示出来了。

下面是Spark官方示例,用2个Array构造了一个Graph。

val users: RDD[(VertexId, (String, String))] =

  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),

                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// Create an RDD for edges

val relationships: RDD[Edge[String]] =

  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),

                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// Define a default user in case there are relationship with missing user

val defaultUser = ("John Doe", "Missing")



// Build the initial Graph

val graph = Graph(users, relationships, defaultUser)

二、GraphLoader

graphLoader是graphx中专门用于图的加载和生成,最重要的函数就是edgeListFile。

//以顶点划分,分红4个分区

val graph = GraphLoader.edgeListFile(sc,"hdfs://192.168.0.10:9000/input/graph/web-Google.txt",minEdgePartitions = 4)


5、GraphX应用举例

一行代码:

val rank = graph.pageRank(0.01).vertices



用RDD实现:

完整代码

// Connect to the Spark clusterval 
sc = new SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt")
  .map(line => line.split(","))
  .map( parts => (parts.head.toLong, parts.tail) ))
  // Parse the edge data which is already in userId -> userId format
  val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
  // Attach the user attributes
  val graph = followerGraph.outerJoinVertices(users) { 
   case (uid, deg, Some(attrList)) => attrList  
   // Some users may not have attributes so we set them as empty
    case (uid, deg, None) => Array.empty[String]
    }
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { 
 case (uid, attrList, Some(pr)) => (pr, attrList.toList) 
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
相关文章
相关标签/搜索