本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何商业交流,可随时联系。算法
主要分为4类:sql
catalyst主要组件有数据库
第一步:通常状况下,streamIter为大表,buildIter为小表,不用关心哪一个表为streamIter,哪一个表为buildIter,这个spark会根据join语句自动帮咱们完成。json
第二步: 先把小表广播到全部大表分区所在节点,而后根据buildIter Table的join key构建Hash Table,把每一行记录都存进HashTable微信
第三步:扫描streamIter Table 每一行数据,使用相同的hash函数匹配 Hash Table中的记录,匹配成功以后再检查join key 是否相等,最后join在一块儿分布式
总结 : hash join 只扫描两表一次,能够认为运算复杂度为o(a+b)。函数
调优性能
1 buildIter整体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,
即不知足broadcast join条件
2 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
3 每一个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每一个分区来自buildIter的记录要能放到内存中
4 streamIter的大小是buildIter三倍以上
复制代码
学习 Python中单引号,双引号,3个单引号及3个双引号的区别请参考:https://blog.csdn.net/woainishifu/article/details/76105667
from pyspark.sql.types import *
>>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()
+---+------+---+
| id| name|age|
+---+------+---+
| 1| Alice| 18|
| 2| Andy| 19|
| 3| Bob| 17|
| 4|Justin| 21|
| 5| Cindy| 20|
+---+------+---+
>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)])
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ])
>>> df2 = spark.createDataFrame(rdd2, schema2)
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice| 160|
| Andy| 159|
| Bob| 170|
|Cindy| 165|
| Rose| 160|
+-----+------+
复制代码
inner join是必定要找到左右表中知足join key 条件的记录,join key都存在的情形。学习
df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()
df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()
>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()
+---+-----+---+------+
| id| name|age|height|
+---+-----+---+------+
| 1|Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 5|Cindy| 20| 165|
+---+-----+---+------+
复制代码
left outer join是以左表为准,在右表中查找匹配的记录,若是查找失败,左表行Row不变,右表一行Row中全部字段都为null的记录。大数据
要求:左表是streamIter,右表是buildIter
df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()
>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()
+---+------+---+------+
| id| name|age|height|
+---+------+---+------+
| 1| Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 4|Justin| 21| null|
| 5| Cindy| 20| 165|
+---+------+---+------+
复制代码
right outer join是以右表为准,在左表中查找匹配的记录,若是查找失败,右表行Row不变,左表一行Row中全部字段都为null的记录。
要求:右表是streamIter,左表是buildIter
df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()
>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()
+----+-----+----+------+
| id| name| age|height|
+----+-----+----+------+
|null| Rose|null| 160|
| 1|Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 5|Cindy| 20| 165|
+----+-----+----+------+
复制代码
full outer join仅采用sort merge join实现,左边和右表既要做为streamIter,又要做为buildIter
左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,若是key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录。
若是keyA<keyB,说明右表中没有与左表rowA对应的记录,那么joinrowA与nullRow。
将rowA更新到左表的下一条记录;若是keyA>keyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB。
将rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录所有处理完。
>>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()
+----+------+----+------+
| id| name| age|height|
+----+------+----+------+
|null| Rose|null| 160|
| 1| Alice| 18| 160|
| 2| Andy| 19| 159|
| 3| Bob| 17| 170|
| 4|Justin| 21| null|
| 5| Cindy| 20| 165|
+----+------+----+------+
复制代码
left semi join是以左表为准,在右表中查找匹配的记录,若是查找成功,则仅返回左表Row的记录,不然返回null。
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,若是查找成功,则返回null,不然仅返回左边的记录
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()
+---+------+---+---+
| id| name|age| rn|
+---+------+---+---+
| 1| Alice| 18| 1|
| 1| Cindy| 20| 2|
| 1|Justin| 21| 3|
| 3| Bob| 17| 1|
| 2| Andy| 19| 1|
+---+------+---+---+
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()
+---+------+---+---+
| id| name|age| rn|
+---+------+---+---+
| 3| Bob| 17| 1|
| 1| Alice| 18| 1|
| 2| Andy| 19| 1|
| 1| Cindy| 20| 2|
| 1|Justin| 21| 3|
+---+------+---+---+
复制代码
一直想深刻挖掘一下SparkSQL内部join原理,终于有时间详细的理一下 Shuffle Join 。做者还准备进一步研究Spark SQL 内核原理,敬请期待个人Spark SQL源码剖析系列。大数据商业实战社区微信公众号即将开启,敬请关注,谢谢!