Bucketing 就是利用 buckets(按列进行分桶)来决定数据分区(partition)的一种优化技术,它能够帮助在计算中避免数据交换(avoid data shuffle)。并行计算的时候shuffle经常会耗费很是多的时间和资源.html
Bucketing 的基本原理比较好理解,它会根据你指定的列(能够是一个也能够是多个)计算哈希值,而后具备相同哈希值的数据将会被分到相同的分区。sql
Bucket的最终目的也是实现分区,可是和Partition的原理不一样,当咱们根据指定列进行Partition的时候,Spark会根据列的名字对数据进行分区(若是没有指定列名则会根据一个随机信息对数据进行分区)。Bucketing的最大不一样在于它使用了指定列的哈希值,这样能够保证具备相同列值的数据被分到相同的分区。函数
目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一块儿使用,以下。这个操做实际上是将数据保存到了文件中(若是不指定path,也会保存到一个临时目录中)。post
df.write .bucketBy(10, "name") .sortBy("name") .mode(SaveMode.Overwrite) .option("path","/path/to") .saveAsTable("bucketed")
数据分桶保存以后,咱们才能使用它。测试
在一个SparkSession内,保存以后你能够经过以下命令经过表名获取其对应的DataFrame.大数据
val df = spark.table("bucketed")
其中spark是一个SparkSession对象。获取以后就可使用DataFrame或者在SQL中使用表。优化
若是你要使用历史保存的数据,那么就不能用上述方法了,也不能像读取常规文件同样使用 spark.read.parquet() ,这种方式读进来的数据是不带bucket信息的。正确的方法是利用CREATE TABLE 语句,详情可用参考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.htmlspa
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] USING data_source [OPTIONS (key1=val1, key2=val2, ...)] [PARTITIONED BY (col_name1, col_name2, ...)] [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS] [LOCATION path] [COMMENT table_comment] [TBLPROPERTIES (key1=val1, key2=val2, ...)] [AS select_statement]
示例以下:scala
spark.sql( """ |CREATE TABLE bucketed | (name string) | USING PARQUET | CLUSTERED BY (name) INTO 10 BUCKETS | LOCATION '/path/to' |""".stripMargin)
在咱们join两个表的时候,若是两个表最好按照相同的列划分红相同的buckets,就能够彻底避免shuffle。根据前面所述的hash值计算方法,两个表具备相同列值的数据会存放在相同的机器上,这样在进行join操做时就不须要再去和其余机器通信,直接在本地完成计算便可。假设你有左右两个表,各有两个分区,那么join的时候实际计算就是下图的样子,两个机器进行计算,而且计算后分区仍是2.3d
而当须要shuffle的时候,会是这样的,
细心的你可能发现了,上面两个分区对应两个Executor,下面shuffle以后对应的怎么成了三个Executor了?没错,当数据进行shuffle以后,分区数就再也不保持和输入的数据相同了,实际上也没有必要保持相同。
咱们考虑的是大数据表的链接,本地测试的时候通常使用小的表,因此逆序须要将小表自动广播的配置关掉。若是开启小表广播,那么两个小表的join以后分区数是不会变的,例如:
左表分区数 | 右表分区数数 | Join以后的分区数 |
---|---|---|
3 | 3 | 3 |
关闭配置的命令以下:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
正常状况下join以后分区数会发生变化:
左表分区数 | 右表分区数数 | Join以后的分区数 |
---|---|---|
3 | 3 | 200 |
这个200其实就是 "spark.sql.shuffle.partitions" 配置的值,默认就是200. 因此若是在Join过程当中出现了shuffle,join以后的分区必定会变,而且变成spark.sql.shuffle.partitions的值。一般你须要根据本身的集群资源修改这个值,从而优化并行度,可是shuffle是不可避免的。
实际测试结果以下:
左表Bucket数 | 右表Bucekt数 | Join以后的分区数 |
---|---|---|
8 | 4 | 8 |
4 | 4 | 4 |
Spark依然会利用一些Bucekt的信息,但具体怎么执行目前还不太清楚,仍是保持一致的好。
另外,若是你spark job的可用计算核心数小于Bucket值,那么从文件中读取以后Bucekt值会变,就是说bucket的数目不会超过你能使用的最大计算核数。
在处理null值的时候,咱们可能会用到一些特殊的函数或者符号,以下表所示。可是在使用bucket的时候这里有个坑,必定要躲过。join的时候千万不要使用 <=> 符号,使用以后spark就会忽略bucket信息,继续shuffle数据,缘由可能和hash计算有关。
若是你喜欢个人文章,能够在 任一平台搜索【黑客悟理】关注我,很是感谢!