佩奇排名(PageRank),又称 网页排名、 谷歌左侧排名,是一种由 搜索引擎根据 网页之间相互的 超连接计算的技术,而做为网页排名的要素之一,以 Google公司创办人 拉里·佩奇(Larry Page)之姓来命名。 Google用它来体现网页的相关性和重要性,在 搜索引擎优化操做中是常常被用来评估网页优化的成效因素之一。
Spark中有一个很重要的特性是对数据集在节点间的分区进行控制,由于在分布式系统中,通讯的代价是很大的,所以控制数据分布以得到最少的网络传输能够极大地提高总体性能,Spark程序能够经过控制RDD分区方式来减小通讯开销。分区适用于那种基于相似join操做基于键的操做,而且一方的RDD数据是比较少变更且须要屡次扫描的状况,这个时候能够对这个RDD作一个分区,最经常使用的是用Hash来进行分区,好比能够对RDD分100个区,此时spark会用每一个键的hash值对100取模,而后把相同结果的放到同一个节点上。html
如今用一个例子(来自《Learning Spark: Lightning-Fast Big Data Analysis》一书)来讲明一下:算法
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS. // This distributes elements of userData by the HDFS block where they are found, // and doesn't provide Spark with any way of knowing in which partition a // particular UserID is located. val sc = new SparkContext(...) val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist() // Function called periodically to process a logfile of events in the past 5 minutes; // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs. def processNewLogs(logFileName: String) { val events = sc.sequenceFile[UserID, LinkInfo](logFileName) val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs val offTopicVisits = joined.filter { case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components !userInfo.topics.contains(linkInfo.topic) }.count() println("Number of visits to non-subscribed topics: " + offTopicVisits) }
上面的例子中,有两个RDD,userData的键值对是(UserID, UserInfo),UserInfo包含了一个该用户订阅的主题的列表,该程序会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内某个网站各用户的访问状况,由(UserID, LinkInfo)。如今,咱们须要对用户访问其未订阅主题的页面进行统计。能够经过Spark的join()操做来完成这个功能,其中须要把UserInfo和LinkInfo的有序对根据UserID进行分组,如上代码。缓存
能够看出,由于每次调用processNewLogs()时都须要执行一次join()操做,可是数据具体的shuffle对咱们来讲倒是不可控的,也就是咱们不知道spark是如何进行分区的。spark默认在执行join()的时候会将两个RDD的键的hash值都算出来,而后将该hash值经过网络传输到同一个节点上进行相同键值的记录的链接操做,以下图所示:网络
由于userData这个RDD里面的数据是几乎不会变更的,或者说是极少会变更的,且它的内容也比events大不少,因此每次都要对它进行shuffle的话,是没有必要且浪费时间的,实际上只须要进行一次shuffle就能够了。分布式
因此,能够经过预先分区来解决这个问题:在进行join()以前,对userData使用partitionBy()转化操做,把它变成一个哈希分区的RDD:ide
val sc = new SparkContext(...) val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...") .partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist()
调用partitionBy()以后,spark就能够预先知道这个RDD是已经进行过哈希分区的了,等到执行join()之时,它就会利用这一点:只对events进行shuffle,将events中特定UserID的记录发送到userData对应分区的机器节点上去。这样的话,就减小了大量的重复的网络通讯,程序性能也会大大提升。改进后的程序的执行过程以下:oop
还有一点,大家可能注意到了新代码里最后还调用了一个persist()方法,这是另外一个小优化点:对于那些数据不常变更且数据量较大的RDD,在进行诸如join()这种链接操做的时候尽可能用persist()来作缓存,以提升性能。另外,分区数目的设置也有讲究,分区数目决定了这个任务在执行链接操做时的并行度,因此通常来讲这个数目应该和集群中的总核心数保持一致。性能
最后,可能有人会问,能不能对events也进行分区进一步提升程序性能?这是没有必要的,由于events RDD是本地变量,每次执行都会更新,因此对它进行分区没有意义,即使对这种一次性变量进行分区,spark依然须要进行一次shuffle,因此,这是没有必要的。优化
PageRank算法是一种从RDD分区获益的更复杂的算法,下面咱们用它为例来进一步讲解Spark分区的使用。网站
若是有不清楚的PageRank算法的具体实现的能够参考我之前的一篇文章:hadoop下基于mapreduce实现pagerank算法
PageRank是一个迭代算法,所以它是一个能从RDD分区中得到性能加速的很好的例子,先上代码:
// Assume that our neighbor list was saved as a Spark objectFile val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100)) .persist() // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD // will have the same partitioner as links var ranks = links.mapValues(v => 1.0) // Run 10 iterations of PageRank for (i <- 0 until 10) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size)) } ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) } // Write out the final ranks ranks.saveAsTextFile("ranks")
这个算法维护两个RDD,一个的键值对是(pageID, linkList),包含了每一个页面的出链指向的相邻页面列表(由pageID组成);另外一个的键值对是(pageID, rank),包含了每一个页面的当前权重值。算法流程以下:
不断迭代步骤2和3,过程当中算法会逐渐收敛于每一个页面的实际PageRank值,实际运行之时大概迭代10+次以上便可。
算法将ranksRDD的每一个元素的值设置为1.0,而后在每次迭代中不断更新ranks变量:首先对ranksRDD和静态的linksRDD进行一次join()操做,来获取每一个页面ID对应的相邻页面列表和当前的权重值,而后使用flatMap建立出『contributions』来记录每一个页面对各相邻页面的贡献值。而后再把这些贡献值按照pageID分别累加起来,把该页面的权重值设为0.15 + 0.85 * contributionsReceived。
接下来分析下上述代码作的的一些优化点:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html