PageRank,即网页排名,又称网页级别、Google 左侧排名或佩奇排名算法
是Google 创始人拉里·佩奇和谢尔盖·布林于1997 年构建早期的搜索系统原型时提出的链 接分析算法,在揉合了诸如Title 标识和Keywords 标识等全部其它因素以后,Google 经过PageRank 来调整结果,使那些更具“等级/重要性”的网页在搜索结果中另网站排名得到提高,从而提升搜索结果的相关性和质量apache
PageRank的计算基于如下两个基本假设:浏览器
数量假设:在Web 图模型中,若是一个页面节点接收到的其余网页指向的入链数量越多,那么这个页面越重要。ide
质量假设:指向页面A的入链质量不一样,质量高的页面会经过连接向其余页面传递更多的权重。因此越是质量高的页面指向页面A,则页面A越重要。网站
利用以上两个假设,PageRank算法刚开始赋予每一个网页相同的重要性得分,经过迭代递归计算来更新每一个页面节点的PageRank得分,直到得分稳定为止。PageRank计算得出的结果是网页的重要性评价,这和用户输入的查询是没有任何关系的,即算法是主题无关的ui
PageRank的计算充分利用了两个假设:数量假设和质量假设。步骤以下:搜索引擎
1) 在初始阶段:网页经过连接关系构建起Web图,每一个页面设置相同的PageRank值,经过若干轮的计算,会获得每一个页面所得到的最终PageRank值。随着每一轮的计算进行,网页当前的PageRank值会不断获得更新atom
2) 在一轮中更新页面PageRank得分的计算方法: 在一轮更新页面PageRank得分的计算中,每一个页面将其当前的PageRank值平均分配到本页面包含的出链上,这样每一个连接即得到了相应的权值。而每一个页面将全部指向本页面的入链所传入的权值求和,便可获得新的PageRank得分。当每一个页面都得到了更新后的PageRank值,就完成了一轮PageRank计 算spa
若是网页T存在一个指向网页A的链接,则代表T的全部者认为A比较重要,从而把T的一部分重要性得分赋予A。这个重要性得分值为:PR(T)/L(T)scala
其中PR(T)为T的PageRank值,L(T)为T的出链数
则A的PageRank值为一系列相似于T的页面重要性得分值的累加
即一个页面的得票数由全部链向它的页面的重要性来决定,到一个页面的超连接至关于对该页投一票。一个页面的PageRank是由全部链向它的页面(链入页面)的重要性通过递归算法获得的。一个有较多链入的页面会有较高的等级,相反若是一个页面没有任何链入页面,那么它没有等级
咱们设向量B为第1、第二...第 N 个网页的网页排名
矩阵A表明网页之间的权重输出关系,其中amn表明第m个网页向第n个网页的输出权重
输出权重计算较为简单:假设m一共有10个出链,指向n的一共有2个,那么m向n输出的权重就为2/10
如今问题变为:A是已知的,咱们要经过计算获得B。 假设Bi是第i次迭代的结果,那么
初始假设全部网页的排名都是 1/N (N为网页总数量),即
经过上述迭代计算,最终Bi会收敛,即Bi无限趋近于B,此时B = B × A
假设有网页A、B、C、D,它们之间的连接关系以下图所示
计算B1以下:
不断迭代,计算结果以下:
第 1 次迭代: 0.125, 0.333, 0.083, 0.458 第 2 次迭代: 0.042, 0.500, 0.042, 0.417 第 3 次迭代: 0.021, 0.431, 0.014, 0.535 第 4 次迭代: 0.007, 0.542, 0.007, 0.444 第 5 次迭代: 0.003, 0.447, 0.002, 0.547 第 6 次迭代: 0.001, 0.549, 0.001, 0.449 第 7 次迭代: 0.001, 0.449, 0.000, 0.550 第 8 次迭代: 0.000, 0.550, 0.000, 0.450 第 9 次迭代: 0.000, 0.450, 0.000, 0.550 第 10次迭代: 0.000, 0.550, 0.000, 0.450
咱们能够发现,A和C的权重变为0,而B和D的权重也趋于在0.5附近摆动。从图中也能够观察出:A和C之间有互相连接,但它们又把权重输出给了B和D,而B和D 之间互相连接,并不向A或C输出任何权重,因此长此以往权重就都转移到B和D了
上面是最简单正常的状况,考虑一下两种特殊状况:
第一种状况是,B存在导向本身的连接,迭代计算过程是:
第 1 次迭代: 0.125, 0.583, 0.083, 0.208 第 2 次迭代: 0.042, 0.833, 0.042, 0.083 第 3 次迭代: 0.021, 0.931, 0.014, 0.035 第 4 次迭代: 0.007, 0.972, 0.007, 0.014 第 5 次迭代: 0.003, 0.988, 0.002, 0.006 第 6 次迭代: 0.001, 0.995, 0.001, 0.002 第 7 次迭代: 0.001, 0.998, 0.000, 0.001 第 8 次迭代: 0.000, 0.999, 0.000, 0.000 第 9 次迭代: 0.000, 1.000, 0.000, 0.000 第10次迭代: 0.000, 1.000, 0.000, 0.000 ... ...
咱们发现最终B权重变为1,其它全部网页的权重都变为了0
第二种状况是B是孤立于其它网页的,既没有入链也没有出链,迭代计算过程是:
第 1 次迭代: 0.125, 0.000, 0.125, 0.250 第 2 次迭代: 0.063, 0.000, 0.063, 0.125 第 3 次迭代: 0.031, 0.000, 0.031, 0.063 第 4 次迭代: 0.016, 0.000, 0.016, 0.031 第 5 次迭代: 0.008, 0.000, 0.008, 0.016 第 6 次迭代: 0.004, 0.000, 0.004, 0.008 第 7 次迭代: 0.002, 0.000, 0.002, 0.004 第 8 次迭代: 0.001, 0.000, 0.001, 0.002 第 9 次迭代: 0.000, 0.000, 0.000, 0.001 第10次迭代: 0.000, 0.000, 0.000, 0.000 ... ...
咱们发现全部网页权重都变为了 0
出现这种状况是由于上面的数学模型出现了问题,该模型认为上网者从一个网页浏览下一个网页都是经过页面的超连接。想象一下正常的上网情景,其实咱们在看完一个网页后,可能直接在浏览器输入一个网址,而不经过上一个页面的超连接总数,设矩阵e 以下:
设用户经过页面超连接浏览下一网页的几率为 α,则直接访问的方式浏览下一个网 页的几率为 1 - α,改进上一节的迭代公式为:
一般状况下设 α 为 0.8,上一节”具体示例”的计算变为以下:
迭代过程以下:
第 1 次迭代: 0.150, 0.317, 0.117, 0.417 第 2 次迭代: 0.097, 0.423, 0.090, 0.390 第 3 次迭代: 0.086, 0.388, 0.076, 0.450 第 4 次迭代: 0.080, 0.433, 0.073, 0.413 第 5 次迭代: 0.079, 0.402, 0.071, 0.447 第 6 次迭代: 0.079, 0.429, 0.071, 0.421 第 7 次迭代: 0.078, 0.408, 0.071, 0.443 第 8 次迭代: 0.078, 0.425, 0.071, 0.426 第 9 次迭代: 0.078, 0.412, 0.071, 0.439 第10次迭代: 0.078, 0.422, 0.071, 0.428 第11次迭代: 0.078, 0.414, 0.071, 0.437 第12次迭代: 0.078, 0.421, 0.071, 0.430 第13次迭代: 0.078, 0.415, 0.071, 0.436 第14次迭代: 0.078, 0.419, 0.071, 0.431 第15次迭代: 0.078, 0.416, 0.071, 0.435 第16次迭代: 0.078, 0.419, 0.071, 0.432 第17次迭代: 0.078, 0.416, 0.071, 0.434 第18次迭代: 0.078, 0.418, 0.071, 0.432 第19次迭代: 0.078, 0.417, 0.071, 0.434 第20次迭代: 0.078, 0.418, 0.071, 0.433 ... ...
因为存在一些出链为0,也就是那些不连接任何其余网页的网,也称为孤立网页,使得不少网页能被访问到。所以须要对PageRank 公式进行修正,即在简单公式的基础上增长了阻尼系数(damping factor)q,q 通常取值q=0.85
其意义是,在任意时刻,用户到达某页面后并继续向后浏览的几率。1- q= 0.15 就是用户中止点击,随机跳到新URL 的几率)的算法被用到了全部页面上,估算页面可能被上网者放入书签的几率。
最后,即全部这些被换算为一个百分比再乘上一个系数q。因为下面的算法,没有页面的PageRank 会是0。因此,Google 经过数学系统给了每一个页面一个最小值
这个公式就是.S Brin 和L. Page 在《The Anatomy of a Large- scale Hypertextual Web Search Engine Computer Networks and ISDN Systems 》定义的公式。
因此一个页面的PageRank 是由其余页面的PageRank 计算获得。Google 不断的重复计算每一个页面的PageRank。若是给每一个页面一个随机PageRank 值(非0),那么通过不断的重复计算,这些页面的PR 值会趋向于正常和稳定。这就是搜索引擎使用它的缘由
首先求完整的公式:
Arvind Arasu 在《Junghoo Cho Hector Garcia - Molina, Andreas Paepcke, Sriram Raghavan. Searching the Web》 更加准确的表达为:
是被研究的页面,
是链入页面的数量,
是链出页面的数量,而N是全部页面的数量
PageRank值是一个特殊矩阵中的特征向量。这个特征向量为:
R是以下等式的一个解:
若是网页i有指向网页j的一个连接,则
不然 =0
import org.apache.spark.graphx.GraphLoader // Load the edges as a graph val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") // Run PageRank val ranks = graph.pageRank(0.0001).vertices // Join the ranks with the usernames val users = sc.textFile("data/graphx/users.txt").map { line => val fields = line.split(",") (fields(0).toLong, fields(1)) } val ranksByUsername = users.join(ranks).map { case (id, (username, rank)) => (username, rank) } // Print the result println(ranksByUsername.collect().mkString("\n"))
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt") val root: VertexId = 1 val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity) val vprog = { (id: VertexId, attr: Double, msg: Double) => math.min(attr,msg) } val sendMessage = { (triplet: EdgeTriplet[Double, Int]) => var iter:Iterator[(VertexId, Double)] = Iterator.empty val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity val isDstMarked = triplet.dstAttr != Double.PositiveInfinity if(!(isSrcMarked && isDstMarked)){ if(isSrcMarked){ iter = Iterator((triplet.dstId,triplet.srcAttr+1)) }else{ iter = Iterator((triplet.srcId,triplet.dstAttr+1)) } } iter } val reduceMessage = { (a: Double, b: Double) => math.min(a,b) } val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)(vprog, sendMessage, reduceMessage) println(bfs.vertices.collect.mkString("\n"))
import scala.reflect.ClassTag import org.apache.spark.graphx._ /** * Computes shortest paths to the given set of landmark vertices, returning a graph where each * vertex attribute is a map containing the shortest-path distance to each reachable landmark. */ object ShortestPaths { /** Stores a map from the vertex id of a landmark to the distance to that landmark. */ type SPMap = Map[VertexId, Int] private def makeMap(x: (VertexId, Int)*) = Map(x: _*) private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = (spmap1.keySet ++ spmap2.keySet).map { k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) }.toMap /** * Computes shortest paths to the given set of landmark vertices. * * @tparam ED the edge attribute type (not used in the computation) * * @param graph the graph for which to compute the shortest paths * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each * landmark. * * @return a graph where each vertex attribute is a map containing the shortest-path distance to * each reachable landmark vertex. */ def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = { val spGraph = graph.mapVertices { (vid, attr) => if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() } val initialMessage = makeMap() def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { addMaps(attr, msg) } def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { val newAttr = incrementMap(edge.dstAttr) if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) } }
import scala.reflect.ClassTag import org.apache.spark.graphx._ /** Connected components algorithm. */ object ConnectedComponents { /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) * @param graph the graph for which to compute the connected components * @param maxIterations the maximum number of iterations to run for * @return a graph with vertex attributes containing the smallest vertex in each connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED] = { require(maxIterations > 0, s"Maximum of iterations must be greater than 0," + s" but got ${maxIterations}") val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { Iterator((edge.srcId, edge.dstAttr)) } else { Iterator.empty } } val initialMessage = Long.MaxValue val pregelGraph = Pregel(ccGraph, initialMessage, maxIterations, EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) ccGraph.unpersist() pregelGraph } // end of connectedComponents /** * Compute the connected component membership of each vertex and return a graph with the vertex * value containing the lowest vertex id in the connected component containing that vertex. * * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (preserved in the computation) * @param graph the graph for which to compute the connected components * @return a graph with vertex attributes containing the smallest vertex in eac connected component */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { run(graph, Int.MaxValue) } }
import scala.reflect.ClassTag import org.apache.spark.graphx._ /** * Compute the number of triangles passing through each vertex. * * The algorithm is relatively straightforward and can be computed in three steps: * * <ul> * <li> Compute the set of neighbors for each vertex.</li> * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li> * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li> * </ul> * * There are two implementations. The default `TriangleCount.run` implementation first removes * self cycles and canonicalizes the graph to ensure that the following conditions hold: *<ul> * <li> There are no self edges </li> * <li> All edges are oriented src > dst </li> * <li> There are no duplicate edges </li> * </ul> * However, the canonicalization procedure is costly as it requires repartitioning the graph. * If the input data is already in "canonical form" with self cycles removed then the * `TriangleCount.runPreCanonicalized` should be used instead. * {{{ * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges() * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices * }}} * */ object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Transform the edge data something cheap to shuffle and then canonicalize val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges() // Get the triangle counts val counters = runPreCanonicalized(canonicalGraph).vertices // Join them bath with the original graph graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => optCounter.getOrElse(0) } } def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => val set = new VertexSet(nbrs.length) var i = 0 while (i < nbrs.length) { // prevent self cycle if (nbrs(i) != vid) { set.add(nbrs(i)) } i += 1 } set } // join the sets with the graph val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) } // Edge function computes intersection of smaller vertex with larger vertex def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) { val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) { (ctx.srcAttr, ctx.dstAttr) } else { (ctx.dstAttr, ctx.srcAttr) } val iter = smallSet.iterator var counter: Int = 0 while (iter.hasNext) { val vid = iter.next() if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) { counter += 1 } } ctx.sendToSrc(counter) ctx.sendToDst(counter) } // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0) // This algorithm double counts each triangle so the final count should be even require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.") dblCount / 2 } } }