transformation 操做,相似于MapReduce 中的combiner
算法
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect()
reduceByKey,相较于普通的shuffle操做(好比groupByKey),它的一个特色,就是说,
会进行map端的本地聚合。
对map端给下个stage每一个task建立的输出文件中,写数据以前,就会进行本地的combiner操做,
也就是说对每个key,对应的values,都会执行你的算子函数(_ + _)
用reduceByKey对性能的提高:
一、在本地进行聚合之后,在map端的数据量就变少了,减小磁盘IO。并且能够减小磁盘空间的占用。
二、下一个stage,拉取数据的量,也就变少了。减小网络的数据传输的性能消耗。
三、在reduce端进行数据缓存的内存占用变少了。
四、reduce端,要进行聚合的数据量也变少了。
总结:
reduceByKey在什么状况下使用呢?
一、很是普通的,好比说,就是要实现相似于wordcount程序同样的,对每一个key对应的值,
进行某种数据公式或者算法的计算(累加、累乘)
二、对于一些相似于要对每一个key进行一些字符串拼接的这种较为复杂的操做,能够本身衡量一下,
其实有时,也是可使用reduceByKey来实现的。可是不太好实现。若是真可以实现出来,
对性能绝对是有帮助的。(shuffle基本上就占了整个spark做业的90%以上的性能消耗,
主要能对shuffle进行必定的调优,都是有价值的)
咱们的程序没有那么去作!可是把这个看成一个课后思考题给你们,看你们能不能对咱们的聚合session
的操做应用上ReduceByKey来提升性能!
sql
算子调优之使用repartition解决Spark SQL低并行度的性能问题
spark.sql.shuffle.partitions 调整DataFrame的shuffle并行度
spark.default.parallelism 调整RDD的shuffle并行度
并行度:以前说过,并行度是本身能够调节,或者说是设置的。
一、spark.default.parallelism
二、textFile(),传入第二个参数,指定partition数量(比较少用)
我们的项目代码中,没有设置并行度,实际上,在生产环境中,是最好本身设置一下的。
官网有推荐的设置方式,你的spark-submit脚本中,会指定你的application总共要启动多少个executor,
100个;每一个executor多少个cpu core,2~3个;总共application,有cpu core,200个。
官方推荐,根据你的application的总cpu core数量(在spark-submit中能够指定,200个),
本身手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。600。
承上启下
你设置的这个并行度,在哪些状况下会生效?哪些状况下,不会生效?
若是你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认全部stage的并行度
都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)
问题来了,Spark SQL,用了。用Spark SQL的那个stage的并行度,你无法本身指定。
Spark SQL本身会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的
并行度。你本身经过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。
好比你第一个stage,用了Spark SQL从hive表中查询出了一些数据,而后作了一些transformation操做,
接着作了一个shuffle操做(groupByKey);下一个stage,在shuffle操做以后,
作了一些transformation操做。hive表,对应了一个hdfs文件,有20个block;
你本身设置了spark.default.parallelism参数为100。
你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,
才会变成你本身设置的那个并行度,100。
问题在哪里?
Spark SQL默认状况下,它的那个并行度,我们无法设置。可能致使的问题,也许没什么问题,
也许颇有问题。Spark SQL所在的那个stage中,后面的那些transformation操做,
可能会有很是复杂的业务逻辑,甚至说复杂的算法。若是你的Spark SQL默认把task数量设置的不多,
20个,而后每一个task要处理为数很多的数据量,而后还要执行特别复杂的算法。
这个时候,就会致使第一个stage的速度,特别慢。第二个stage,1000个task,刷刷刷,很是快。
解决上述Spark SQL没法设置并行度和task数量的办法,是什么呢?
repartition算子,你用Spark SQL这一步的并行度和task数量,确定是没有办法去改变了。可是呢,
能够将你用Spark SQL查询出来的RDD,使用repartition算子,去从新进行分区,
此时能够分区成多个partition,好比从20个partition,分区成100个。
而后呢,从repartition之后的RDD,再日后,并行度和task数量,就会按照你预期的来了。
就能够避免跟Spark SQL绑定在一个stage中的算子,只能使用少许的task去处理大量数据以及
复杂的算法逻辑。
这里就颇有可能发生上面说的问题
好比说,Spark SQl默认就给第一个stage设置了20个task,可是根据你的数据量以及算法的复杂度
实际上,你须要1000个task去并行执行
因此说,在这里,就能够对Spark SQL刚刚查询出来的RDD执行repartition重分区操做
数据库
默认状况下,通过了这种filter以后,RDD中的每一个partition的数据量,可能都不太同样了。
(本来每一个partition的数据量多是差很少的)
问题:
一、每一个partition数据量变少了,可是在后面进行处理的时候,仍是要跟partition数量同样数量的task,
来进行处理;有点浪费task计算资源。
二、每一个partition的数据量不同,会致使后面的每一个task处理每一个partition的时候,
每一个task要处理的数据量就不一样,这个时候很容易发生什么问题?
数据倾斜。。。。
好比说,第二个partition的数据量才100;可是第三个partition的数据量是900;
那么在后面的task处理逻辑同样的状况下,不一样的task要处理的数据量可能差异达到了9倍,
甚至10倍以上;一样也就致使了速度的差异在9倍,甚至10倍以上。
这样的话呢,就会致使有些task运行的速度很快;有些task运行的速度很慢。这,就是数据倾斜。
针对上述的两个问题,咱们但愿应该可以怎么样?
一、针对第一个问题,咱们但愿能够进行partition的压缩吧,由于数据量变少了,
那么partition其实也彻底能够对应的变少。好比原来是4个partition,如今彻底能够变成2个partition。
那么就只要用后面的2个task来处理便可。就不会形成task计算资源的浪费。
(没必要要,针对只有一点点数据的partition,还去启动一个task来计算)
二、针对第二个问题,其实解决方案跟第一个问题是同样的;也是去压缩partition,
尽可能让每一个partition的数据量差很少。那么这样的话,后面的task分配到的partition的数据量
也就差很少。不会形成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。
有了解决问题的思路以后,接下来,咱们该怎么来作呢?实现?
缓存
主要就是用于在filter操做以后,针对每一个partition的数据量各不相同的状况,来压缩partition的数量。
减小partition的数量,并且让每一个partition的数据量都尽可能均匀紧凑。
从而便于后面的task进行计算操做,在某种程度上,可以必定程度的提高性能。
说明一下:
这儿,是对完整的数据进行了filter过滤,过滤出来点击行为的数据点击行为的数据其实只占总数据的一小部分(譬如 20%)
因此过滤之后的RDD,每一个partition的数据量,颇有可能跟咱们以前说的同样,会很不均匀并且数据量确定会变少不少
因此针对这种状况,仍是比较合适用一下coalesce算子的,在filter事后去减小partition的数量
coalesce(100)
这个就是说通过filter以后再把数据压缩的比较紧凑,压缩为100个数据分片,也就是造成了 100 个 partition
对这个coalesce操做作一个说明
若是运行模式都是local模式,主要是用来测试,因此local模式下,
不用去设置分区和并行度的数量
local模式本身自己就是进程内模拟的集群来执行,自己性能就很高
并且对并行度、partition数量都有必定的内部的优化
这里咱们再本身去设置,就有点多此一举
可是就是跟你们说明一下,coalesce算子的使用,便可
网络
foreach的写库原理
默认的foreach的性能缺陷在哪里?
首先,对于每条数据,都要单独去调用一次function,task为每一个数据,都要去执行一次function函数。
若是100万条数据,(一个partition),调用100万次。性能比较差。
另一个很是很是重要的一点
若是每一个数据,你都去建立一个数据库链接的话,那么你就得建立100万次数据库链接。
可是要注意的是,数据库链接的建立和销毁,都是很是很是消耗性能的。虽然咱们以前已经用了
数据库链接池,只是建立了固定数量的数据库链接。
你仍是得屡次经过数据库链接,往数据库(MySQL)发送一条SQL语句,而后MySQL须要去执行这条SQL语句。
若是有100万条数据,那么就是100万次发送SQL语句。
以上两点(数据库链接,屡次发送SQL语句),都是很是消耗性能的。
foreachPartition,在生产环境中,一般来讲,都使用foreachPartition来写数据库的
使用批处理操做(一条SQL和多组参数)
发送一条SQL语句,发送一次
一会儿就批量插入100万条数据。
用了foreachPartition算子以后,好处在哪里?
一、对于咱们写的function函数,就调用一次,一次传入一个partition全部的数据
二、主要建立或者获取一个数据库链接就能够
三、只要向数据库发送一次SQL语句和多组参数便可
在实际生产环境中,清一色,都是使用foreachPartition操做;可是有个问题,跟mapPartitions操做同样,
若是一个partition的数量真的特别特别大,好比真的是100万,那基本上就不太靠谱了。
一会儿进来,颇有可能会发生OOM,内存溢出的问题。
一组数据的对比:生产环境
一个partition大概是1千条左右
用foreach,跟用foreachPartition,性能的提高达到了2~3分钟。
实际项目操做:
首先JDBCHelper里面已经封装好了一次批量插入操做!
批量插入session detail
惟一不同的是咱们须要ISessionDetailDAO里面去实现一个批量插入
List<SessionDetail> sessionDetails
session