spark基础知识请参考spark官网:http://spark.apache.org/docs/1.2.1/quick-start.htmlhtml
不管是mapreduce仍是spark ,分布式框架的性能优化方向大体分为:负载均衡、网络传输和磁盘I/O 这三块。而spark是基于内存的计算框架,所以在编写应用时须要充分利用其内存计算特征。本篇主要针对sql
spark应用中的join问题进行讨论,关于集群参数的优化会在另外一篇文章中说起。数据库
在传统的数据库平台和分布式计算平台,join的性能消耗都是很可观的,对spark来讲若是join的表比较大,那么在shuffle时网络及磁盘压力会明显提高,严重时可能会形成excutor失败致使任务没法进行下去,apache
对这种join的优化方法主要是采用map和filter来改变join的实现方式,减小shuffle阶段的网络和磁盘I/O。下面以表的数据量大小分两部分来讨论。缓存
大表:数据量较大的表性能优化
小表:数据量较小的表网络
1、大表与小表之间的join数据结构
这种join是大部分业务场景的主要join方式,将小表以broadcast的形式分发到每一个executor后对大表进行filter操做,如下对每种join进行示例说明(兼容表中ID不惟一的状况)。负载均衡
一、leftOuterJoin 框架
>>>d1=sc.parallelize([(1,2),(2,3),(2,4),(3,4)])
>>>d2=sc.parallelize([(1,'a'),(2,'b'),(1,'d'),(5,'2')])
原生实现方式:
>>>d1.leftOuterJoin(d2).collect()
>>>[(1, (2, 'a')), (1, (2, 'd')), (2, (4, 'b')), (2, (3, 'b')), (3, (4, None))]
map实现方式(小表在右的实现方式,小表在左的状况会稍微复杂些,须要多一些操做操做,实际场景中很少见):
def doJoin(row): result=[] if row[1][1] is not None: for i in row[1][1]: result+=[(row[0],(row[1][0],i))] else: result+=[row] return result d2_map={} for i in d2.groupByKey().collect(): d2_map[i[0]]=i[1] d2_broadcast=sc.broadcast(d2_map) d2_dict=d2_broadcast.value d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b')), (3, (4, None))]
二、join
这里的join指的是innerjoin即只取出匹配到的数据项,只须要在上面的实现方式中加个filter便可
d1.map(lambda row:(row[0],(row[1],d2_dict.get(row[0])))).filter(lambda row:row[1][1] is not None).flatMap(doJoin).collect()
>>>[(1, (2, 'd')), (1, (2, 'a')), (2, (3, 'b')), (2, (4, 'b'))]
2、大表与大表之间的join(Reduce-join)
大表之间的join没法经过缓存数据来达到优化目的,所以须要把优化的重点放在分区效率及key的设计上
一、join的key值尽可能使用数值类型,减小分区及shuffle的操做时间,在join时数值类型的key值在匹配时更快
二、将过滤条件放在join以前,使得join的数据量尽可能最少
三、在join以前将两个表按相同分区数进行从新分区
reduce-join:指将两个表按key值进行分区,相同key的数据会被分在同一个分区,最后使用mapPartition进行join操做。
四、若是须要减小分区和并行度,请使用coalesce 而非repartition 方法。
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
3、其它优化方式
一、同一份数据被屡次用到,在读入时进行缓存,后面直接使用,例如配置表,若是数据量不大则进行broadcast,不然使用cache
二、尽可能减小重复计算,一样的计算逻辑只计算一次
三、几个优化参数
spark.akka.frameSize 1000 集群间通讯 一帧数据的大小,设置过小可能会致使通讯延迟
spark.akka.timeout 100 通讯等待最长时间(秒为单位)
spark.akka.heartbeat.pauses 600 心跳失败最大间隔(秒为单位)
spark.serializer org.apache.spark.serializer.KryoSerializer 序列化方式(sprak本身的实现方式)
spark.sql.autoBroadcastJoinThreshold -1 禁止自动broadcast表
spark.shuffle.consolidateFiles true shuffle 自动合并小文件
4、后续优化方向
一、内存优化:对象所占用的内存,访问对象的消耗以及垃圾回收(garbage collection)所占用的开销
二、优化数据结构
三、优化RDD存储
四、并行度