在实际生产环境中,有这样一种场景:用户数据位于HDFS中,业务须要按期将这部分海量数据导入HBase系统,以执行随机查询更新操做。这种场景若是调用写入API进行处理,极有可能会给RegionServer带来较大的写入压力:sql
•引发RegionServer频繁flush,进而不断compact、split,影响集群稳定性。shell
•引发RegionServer频繁GC,影响集群稳定性。apache
•消耗大量CPU资源、带宽资源、内存资源以及IO资源,与其余业务产生资源竞争。app
•在某些场景下,好比平均KV大小比较大的场景,会耗尽RegionServer的处理线程,致使集群阻塞。工具
鉴于存在上述问题,HBase提供了另外一种将数据写入HBase集群的方法——BulkLoad。BulkLoad首先使用MapReduce将待写入集群数据转换为HFile文件,再直接将这些HFile文件加载到在线集群中。显然,BulkLoad方案没有将写请求发送给RegionServer处理,能够有效避免上述一系列问题。oop
BulkLoad核心流程ui
从HBase的视角来看,BulkLoad主要由两个阶段组成:spa
1)HFile生成阶段。这个阶段会运行一个MapReduce任务,MapReduce的mapper须要本身实现,将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rowkey,Value能够是KeyValue对象、Put对象甚至Delete对象;MapReduce的reducer由HBase负责,经过方法HFileOutputFormat2.configureIncrementalLoad()进行配置,这个方法主要负责如下事项。线程
•根据表信息配置一个全局有序的partitioner。code
•将partitioner文件上传到HDFS集群并写入DistributedCache。
•设置reduce task的个数为目标表Region的个数。
•设置输出key/value类知足HFileOutputFormat所规定的格式要求。
•根据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)。
这个阶段会为每一个Region生成一个对应的HFile文件。
2)HFile导入阶段。HFile准备就绪以后,就可使用工具completebulkload将HFile加载到在线HBase集群。completebulkload工具主要负责如下工做。
•依次检查第一步生成的全部HFile文件,将每一个文件映射到对应的Region。
•将HFile文件移动到对应Region所在的HDFS文件目录下。
•告知Region对应的RegionServer,加载HFile文件对外提供服务。
若是在BulkLoad的中间过程当中Region发生了分裂,completebulkload工具会自动将对应的HFile文件按照新生成的Region边界切分红多个HFile文件,保证每一个HFile都能与目标表当前的Region相对应。但这个过程须要读取HFile内容,于是并不高效。须要尽可能减小HFile生成阶段和HFile导入阶段的延迟,最好可以在HFile生成以后马上执行HFile导入。
基于BulkLoad两阶段的工做原理,BulkLoad的核心流程如图所示。
BulkLoad基础案例
在hbase上建立一张表:
create 'test_log','ext'
执行BulkLoad代码:
import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object BulkLoad1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HbaseBulkLoad") val spark = SparkSession.builder .config(sparkConf) .getOrCreate() val sc = spark.sparkContext val datas = List( ("abc", ("ext", "type", "login")), ("ccc", ("ext", "type", "logout")) ) val dataRdd = sc.parallelize(datas) val output = dataRdd.map { x => { val rowKey = Bytes.toBytes(x._1) val immutableRowKey = new ImmutableBytesWritable(rowKey) val colFam = x._2._1 val colName = x._2._2 val colValue = x._2._3 val kv = new KeyValue( rowKey, Bytes.toBytes(colFam), Bytes.toBytes(colName), Bytes.toBytes(colValue.toString) ) (immutableRowKey, kv) } } val hConf = HBaseConfiguration.create() hConf.addResource("hbase_site.xml") val hTableName = "test_log" hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName) val tableName = TableName.valueOf(hTableName) val conn = ConnectionFactory.createConnection(hConf) val table = conn.getTable(tableName) val regionLocator = conn.getRegionLocator(tableName) val hFileOutput = "/tmp/h_file" output.saveAsNewAPIHadoopFile(hFileOutput, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hConf) val bulkLoader = new LoadIncrementalHFiles(hConf) bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator) } }
提交spark执行:
spark-submit \
--master yarn \
--conf spark.yarn.tokens.hbase.enabled=true \
--deploy-mode client \
--class BulkLoad1
--executor-memory 512m
--driver-memory 512m
--total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar
在hbase shell上查看:
scan 'test_log'