HBase数据迁移到Kafka实战

1.概述

在实际的应用场景中,数据存储在HBase集群中,可是因为一些特殊的缘由,须要将数据从HBase迁移到Kafka。正常状况下,通常都是源数据到Kafka,再有消费者处理数据,将数据写入HBase。可是,若是逆向处理,如何将HBase的数据迁移到Kafka呢?今天笔者就给你们来分享一下具体的实现流程。html

2.内容

通常业务场景以下,数据源头产生数据,进入Kafka,而后由消费者(如Flink、Spark、Kafka API)处理数据后进入到HBase。这是一个很典型的实时处理流程。流程图以下:app

 

 上述这类实时处理流程,处理数据都比较容易,毕竟数据流向是顺序处理的。可是,若是将这个流程逆向,那么就会遇到一些问题。分布式

2.1 海量数据

HBase的分布式特性,集群的横向拓展,HBase中的数据每每都是百亿、千亿级别,或者数量级更大。这类级别的数据,对于这类逆向数据流的场景,会有个很麻烦的问题,那就是取数问题。如何将这海量数据从HBase中取出来?ide

2.2 没有数据分区

咱们知道HBase作数据Get或者List<Get>很快,也比较容易。而它又没有相似Hive这类数据仓库分区的概念,不能提供某段时间内的数据。若是要提取最近一周的数据,可能全表扫描,经过过滤时间戳来获取一周的数据。数量小的时候,可能问题不大,而数据量很大的时候,全表去扫描HBase很困难。函数

3.解决思路

对于这类逆向数据流程,如何处理。其实,咱们能够利用HBase Get和List<Get>的特性来实现。由于HBase经过RowKey来构建了一级索引,对于RowKey级别的取数,速度是很快的。实现流程细节以下:工具

 

 数据流程如上图所示,下面笔者为你们来剖析每一个流程的实现细节,以及注意事项。oop

3.1 Rowkey抽取

咱们知道HBase针对Rowkey取数作了一级索引,因此咱们能够利用这个特性来展开。咱们能够将海量数据中的Rowkey从HBase表中抽取,而后按照咱们制定的抽取规则和存储规则将抽取的Rowkey存储到HDFS上。学习

这里须要注意一个问题,那就是关于HBase Rowkey的抽取,海量数据级别的Rowkey抽取,建议采用MapReduce来实现。这个得益于HBase提供了TableMapReduceUtil类来实现,经过MapReduce任务,将HBase中的Rowkey在map阶段按照指定的时间范围进行过滤,在reduce阶段将rowkey拆分为多个文件,最后存储到HDFS上。大数据

这里可能会有同窗有疑问,都用MapReduce抽取Rowkey了,为啥不直接在扫描处理列簇下的列数据呢?这里,咱们在启动MapReduce任务的时候,Scan HBase的数据时只过滤Rowkey(利用FirstKeyOnlyFilter来实现),不对列簇数据作处理,这样会快不少。对HBase RegionServer的压力也会小不少。spa

Row Column
row001 info:name
row001 info:age
row001 info:sex
row001 info:sn

这里举个例子,好比上表中的数据,其实咱们只须要取出Rowkey(row001)。可是,实际业务数据中,HBase表描述一条数据可能有不少特征属性(例如姓名、性别、年龄、身份证等等),可能有些业务数据一个列簇下有十几个特征,可是他们却只有一个Rowkey,咱们也只须要这一个Rowkey。那么,咱们使用FirstKeyOnlyFilter来实现就很合适了。

/**
 * A filter that will only return the first KV from each row.
 * <p>
 * This filter can be used to more efficiently perform row count operations.
 */

这个是FirstKeyOnlyFilter的一段功能描述,它用于返回第一条KV数据,官方其实用它来作计数使用,这里咱们稍加改进,把FirstKeyOnlyFilter用来作抽取Rowkey。

3.2 Rowkey生成

抽取的Rowkey如何生成,这里可能根据实际的数量级来确认Reduce个数。建议生成Rowkey文件时,切合实际的数据量来算Reduce的个数。尽可能不用为了使用方便就一个HDFS文件,这样后面很差维护。举个例子,好比HBase表有100GB,咱们能够拆分为100个文件。

3.3 数据处理

在步骤1中,按照抽取规则和存储规则,将数据从HBase中经过MapReduce抽取Rowkey并存储到HDFS上。而后,咱们在经过MapReduce任务读取HDFS上的Rowkey文件,经过List<Get>的方式去HBase中获取数据。拆解细节以下:

 

 Map阶段,咱们从HDFS读取Rowkey的数据文件,而后经过批量Get的方式从HBase取数,而后组装数据发送到Reduce阶段。在Reduce阶段,获取来自Map阶段的数据,写数据到Kafka,经过Kafka生产者回调函数,获取写入Kafka状态信息,根据状态信息判断数据是否写入成功。若是成功,记录成功的Rowkey到HDFS,便于统计成功的进度;若是失败,记录失败的Rowkey到HDFS,便于统计失败的进度。

3.4 失败重跑

经过MapReduce任务写数据到Kafka中,可能会有失败的状况,对于失败的状况,咱们只须要记录Rowkey到HDFS上,当任务执行完成后,再去程序检查HDFS上是否存在失败的Rowkey文件,若是存在,那么再次启动步骤10,即读取HDFS上失败的Rowkey文件,而后再List<Get> HBase中的数据,进行数据处理后,最后再写Kafka,以此类推,直到HDFS上失败的Rowkey处理完成为止。

 

4.实现代码

这里实现的代码量也并不复杂,下面提供一个伪代码,能够在此基础上进行改造(例如Rowkey的抽取、MapReduce读取Rowkey并批量Get HBase表,而后在写入Kafka等)。示例代码以下:

public class MRROW2HDFS {

    public static void main(String[] args) throws Exception {

        Configuration config = HBaseConfiguration.create(); // HBase Config info
        Job job = Job.getInstance(config, "MRROW2HDFS");
        job.setJarByClass(MRROW2HDFS.class);
        job.setReducerClass(ROWReducer.class);

        String hbaseTableName = "hbase_tbl_name";

        Scan scan = new Scan();
        scan.setCaching(1000);
        scan.setCacheBlocks(false);
        scan.setFilter(new FirstKeyOnlyFilter());

        TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, ROWMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("/tmp/rowkey.list")); // input you storage rowkey hdfs path
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    public static class ROWMapper extends TableMapper<Text, Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {

            for (Cell cell : value.rawCells()) {
                // Filter date range
                // context.write(...);
            }
        }
    }
    
    public static class ROWReducer extends Reducer<Text,Text,Text,Text>{
        private Text result = new Text();
        
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
            for(Text val:values){
                result.set(val);
                context.write(key, result);
            }
        }
    }
}

5.总结

整个逆向数据处理流程,并不算复杂,实现也是很基本的MapReduce逻辑,没有太复杂的逻辑处理。在处理的过程当中,须要几个细节问题,Rowkey生成到HDFS上时,可能存在行位空格的状况,在读取HDFS上Rowkey文件去List<Get>时,最好对每条数据作个过滤空格处理。另外,就是对于成功处理Rowkey和失败处理Rowkey的记录,这样便于任务失败重跑和数据对帐。能够知晓数据迁移进度和完成状况。同时,咱们可使用Kafka Eagle监控工具来查看Kafka写入进度。

6.结束语

这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

相关文章
相关标签/搜索