解锁Apache Hudi删除记录新姿式

1. 引入

在0.5.1版本以前,用户若想删除某条记录,可使用Spark DataSource,并将DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY设置为EmptyHoodieRecordPayload.class.getName,即可删除指定记录,在Hudi新发布的0.5.1版本,可不使用上述配置项删除记录,而提供三种方式删除记录:Hudi APISpark DataSourceDeltaStreamer,下面逐一介绍如何使用。sql

2. 步骤

2.1 使用Hudi API

若是应用程序中已经内嵌了HoodieWriteClient,能够直接使用HoodieWriteClient以下API删除记录shell

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
* deduped and non existant keys will be removed before deleting.
*
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param commitTime Commit time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime);apache

2.2 使用DataSource

介绍如何使用Datasource API对示例数据集执行删除的示例。与快速入门中的示例相同。json

1 启动spark-shell

bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'ide

2 导入必要的Import

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._ui

val tableName = "hudi_cow_table"
val basePath = "file:///tmp/hudi_cow_table"
val dataGen = new DataGeneratorspa

3 插入数据

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath);.net

4 查询数据

val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "////")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
// copy the value displayed to be used in next stepscala

5 准备待删除数据集

首先经过查询准备好待删除的数据集code

val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

6. 删除数据

val deletes = dataGen.generateDeletes(df.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath);

7. 验证

从新加载表记录,验证记录是否被删除

val roViewDFAfterDelete = spark.
read.
format("org.apache.hudi").
load(basePath + "////")
roViewDFAfterDelete.createOrReplaceTempView("hudi_ro_table")
spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'").show() // should not return any rows

2.3 使用DeltaStreamer

使用HoodieDeltaStreamer进行删除与upsert相同,它依赖每一个记录中名为“hoodie_is_deleted”的boolean类型的特定字段。

  • 若是记录的字段值设置为false或不存在,则将其视为常规upsert。

  • 若是不是(若是该值设置为true),则将其视为已删除记录。

这意味着必须更改数据源的schema来添加此字段,而且全部传入记录都应设置此字段值,在将来的版本中咱们将尽可能放开这点。

如原始数据源的schema以下。

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}
]}

那么要利用DeltaStreamer的删除功能,必须更改schema以下。

{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}
]}

upsert传入记录示例数据以下

{"ts": 0.0, "uuid": "69cdb048-c93e-4532-adf9-f61ce6afe605", "rank": 1034, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : false}

delete传入记录示例数据以下

{"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

只须要进行一次性的变动,DeltasDreamer将处理每批中的upsert和delete,而且每一批均可以包含upsert和deletes的混合,以后不须要额外的步骤或更改。

3. 总结

在Hudi 0.5.1-incubating版本中引入了额外三种删除记录的能力,用户可以使用上述任意一种方案来达到删除记录的目的。

相关文章
相关标签/搜索