详解 Spark 中的 Bucketing

什么是 Bucketing

Bucketing 就是利用 buckets(按列进行分桶)来决定数据分区(partition)的一种优化技术,它能够帮助在计算中避免数据交换(avoid data shuffle)。并行计算的时候shuffle经常会耗费很是多的时间和资源.html

Bucketing 的基本原理比较好理解,它会根据你指定的列(能够是一个也能够是多个)计算哈希值,而后具备相同哈希值的数据将会被分到相同的分区。sql

bucket

Bucket和Partition的区别

Bucket的最终目的也是实现分区,可是和Partition的原理不一样,当咱们根据指定列进行Partition的时候,Spark会根据列的名字对数据进行分区(若是没有指定列名则会根据一个随机信息对数据进行分区)。Bucketing的最大不一样在于它使用了指定列的哈希值,这样能够保证具备相同列值的数据被分到相同的分区。函数

怎么用 Bucket

按Bucket保存

目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一块儿使用,以下。这个操做实际上是将数据保存到了文件中(若是不指定path,也会保存到一个临时目录中)。post

df.write
  .bucketBy(10, "name")
  .sortBy("name")
  .mode(SaveMode.Overwrite)
  .option("path","/path/to")
  .saveAsTable("bucketed")

数据分桶保存以后,咱们才能使用它。测试

直接从table读取

在一个SparkSession内,保存以后你能够经过以下命令经过表名获取其对应的DataFrame.大数据

val df = spark.table("bucketed")

其中spark是一个SparkSession对象。获取以后就可使用DataFrame或者在SQL中使用表。优化

从已经保存的Parquet文件读取

若是你要使用历史保存的数据,那么就不能用上述方法了,也不能像读取常规文件同样使用 spark.read.parquet() ,这种方式读进来的数据是不带bucket信息的。正确的方法是利用CREATE TABLE 语句,详情可用参考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.htmlspa

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
  USING data_source
  [OPTIONS (key1=val1, key2=val2, ...)]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1=val1, key2=val2, ...)]
  [AS select_statement]

示例以下:scala

spark.sql(
  """
    |CREATE TABLE bucketed
    | (name string)
    |  USING PARQUET
    |  CLUSTERED BY (name) INTO 10 BUCKETS
    |  LOCATION '/path/to'
    |""".stripMargin)

用Buckets的好处

在咱们join两个表的时候,若是两个表最好按照相同的列划分红相同的buckets,就能够彻底避免shuffle。根据前面所述的hash值计算方法,两个表具备相同列值的数据会存放在相同的机器上,这样在进行join操做时就不须要再去和其余机器通信,直接在本地完成计算便可。假设你有左右两个表,各有两个分区,那么join的时候实际计算就是下图的样子,两个机器进行计算,而且计算后分区仍是2.3d

with bucket

而当须要shuffle的时候,会是这样的,
without bucket

细心的你可能发现了,上面两个分区对应两个Executor,下面shuffle以后对应的怎么成了三个Executor了?没错,当数据进行shuffle以后,分区数就再也不保持和输入的数据相同了,实际上也没有必要保持相同。

本地测试

咱们考虑的是大数据表的链接,本地测试的时候通常使用小的表,因此逆序须要将小表自动广播的配置关掉。若是开启小表广播,那么两个小表的join以后分区数是不会变的,例如:

左表分区数 右表分区数数 Join以后的分区数
3 3 3

关闭配置的命令以下:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

正常状况下join以后分区数会发生变化:

左表分区数 右表分区数数 Join以后的分区数
3 3 200

这个200其实就是 "spark.sql.shuffle.partitions" 配置的值,默认就是200. 因此若是在Join过程当中出现了shuffle,join以后的分区必定会变,而且变成spark.sql.shuffle.partitions的值。一般你须要根据本身的集群资源修改这个值,从而优化并行度,可是shuffle是不可避免的。

左右两个表Bucket数目不一致时

实际测试结果以下:

左表Bucket数 右表Bucekt数 Join以后的分区数
8 4 8
4 4 4

Spark依然会利用一些Bucekt的信息,但具体怎么执行目前还不太清楚,仍是保持一致的好。

另外,若是你spark job的可用计算核心数小于Bucket值,那么从文件中读取以后Bucekt值会变,就是说bucket的数目不会超过你能使用的最大计算核数。

不要使用的 <=> 符号!!!

在处理null值的时候,咱们可能会用到一些特殊的函数或者符号,以下表所示。可是在使用bucket的时候这里有个坑,必定要躲过。join的时候千万不要使用 <=> 符号,使用以后spark就会忽略bucket信息,继续shuffle数据,缘由可能和hash计算有关。

null

原文链接

若是你喜欢个人文章,能够在 任一平台搜索【黑客悟理】关注我,很是感谢!
相关文章
相关标签/搜索