本文主要是总结一下hbase几种写入常见的方式,以及涉及的应用场景,尽可能覆盖平常业务中的使用场景,另外再总结一下其中涉及到的一些原理知识。也算是本身学习的汇总。hbase也接触好久了,各类应用的场景也见到了不少。借此机会好好总结一下。html
hbase通常的插入过程都使用HTable对象,将数据封装在Put对象中,Put在new建立的时候须要传入rowkey,并将列族,列名,列值add进去。而后HTable调用put方法,经过rpc请求提交到Regionserver端。写入的方式能够分为如下几种java
HTable数据库
要向hbase中写入就免不了要和HTable打交道,HTable负责向一张hbase表中读或者写数据,HTable对象是非线程安全的。多线程使用时须要注意,建立HTable对象时须要指定表名参数,HTable内部有一个LinkedList<Row>的队列writeAsyncBuffer ,负责对写入到hbase的数据在客户端缓存,开启缓存使用参数 table.setAutoFlushTo(false); 默认状况不开启每次put一条数据时,htable对象就会调用flushCommits方法向regserver中提交,开启缓存则会比较队列的大小,若是大于某个值则调用flushCommits,这个值默认是2m,能够经过在hbase-site.xml中设置参数 "hbase.client.write.buffer"来调整,默认是2097152, 在关闭htable链接时,会隐式的调用flushCommits方法,保证数据彻底提交。提交时会根据rowkey定位该put应该提交到哪一个reginserver,而后每一个regionserver一组action发送出去,(多扯两句,这里和solr略微不一样,solr能够把请求发送到任一节点,节点判断是否属于当前节点,若是不符合则将请求发送全部节点,但同时也能够实现和hbase相似的功能)apache
单条put缓存
最简单基础的写入hbase,通常应用场景是线上业务运行时,记录单条插入,如报文记录,处理记录,写入后htable对象即释放。每次提交就是一次rpc请求。安全
table.setAutoFlushTo(true);
1 /** 2 * 插入一条记录 3 * rowkey 为rk001 列族为f1 4 * 插入两列 c1列 值为001 5 * c2列 值为002 6 * 7 */ 8 public void insertPut(){ 9 //Configuration 加载hbase的配置信息,HBaseConfiguration.create()是先new Configuration而后调用addResource方法将 10 //hbase-site.xml配置文件加载进来 11 Configuration conf = HBaseConfiguration.create(); 12 try { 13 table = new HTable(conf,tableName); 14 table.setAutoFlushTo(true);//不显示设置则默认是true 15 16 String rowkey = "rk001"; 17 Put put = new Put(rowkey.getBytes()); 18 put.add(cf.getBytes(),"c1".getBytes(),"001".getBytes()); 19 put.add(cf.getBytes(),"c2".getBytes(),"002".getBytes()); 20 table.put(put); 21 table.close();//关闭hbase链接 22 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 }
多条put多线程
有了单条的put天然就想到这种方式实际上是低效的,每次只能提交一条记录,有没有上面方法能够一次提交多条记录呢?减小请求次数, 最简单的方式使用List<Put>,这种方式操做时和单条put没有区别,将put对象add到list中,而后调用put(List<Put>)方法,过程和单条put基本一致,应用场景通常在数据量稍多的环境下,经过批量提交减小请求次数架构
1 /** 2 * 批量请求,一次提交两条 3 */ 4 public void insertPuts() { 5 Configuration conf = HBaseConfiguration.create(); 6 try { 7 table = new HTable(conf, tableName); 8 table.setAutoFlushTo(true); 9 List<Put> lists = new ArrayList<Put>(); 10 11 String rowkey1 = "rk001"; 12 Put put1 = new Put(rowkey1.getBytes()); 13 put1.add(cf.getBytes(), "c1".getBytes(), "001".getBytes()); 14 put1.add(cf.getBytes(), "c2".getBytes(), "002".getBytes()); 15 lists.add(put1); 16 17 String rowkey2 = "rk002"; 18 Put put2 = new Put(rowkey2.getBytes()); 19 put2.add(cf.getBytes(), "c1".getBytes(), "v2001".getBytes()); 20 put2.add(cf.getBytes(), "c2".getBytes(), "v2002".getBytes()); 21 lists.add(put2); 22 23 24 table.put(lists); 25 table.close(); 26 27 } catch (IOException e) { 28 e.printStackTrace(); 29 } 30 31 32 }
使用Mapreduceapp
以上两种方式通常用来处理小批量的数据,那么在面对数据量多的时候应该如何处理呢,常见的作法使用多线程来并行向hbase中写入,不过这须要咱们本身来控制任务的划分,比较麻烦,另外值得注意的时HTable对象是线程不安全的,所以在多线程写入时须要格外注意。而更加常见的作法是使用Mapreduce。HBase自己就是运行在hdfs上的数据库,所以和Mapreduce有很好的融合。分布式
使用mapreduce来向hbase中写入数据时,将输入文件拆分红一个个的块,而后交给集群,分布式的去读取块,而后数据写入到hbase中,而根据具体业务状况的不一样,在使用Mapreduce中也有略微的不一样,先介绍一下最多见的处理过程,使用hbase官方提供的hbase和mapreduce整合的工具类TableMapReduceUtil,具体使用细节能够参考HBase官方手册 这里只贴一下在map端读入数据,而后直接写hbase的情景,这种方式通常用于hive或者文件数据入hbase,不须要业务逻辑处理,保持原有的数据入库,rowkey通常时某个字段或者若干个字段拼接而成,好比卡号信息入库,使用卡号做为rowkey(须要对卡号作散列处理,卡号通常为62或者40开头,会形成数据热点问题)
1 package hbase.demo.mapreduce; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 20 /** 21 * Created by BB on 2017/2/26. 22 */ 23 public class InsertMR extends Configured implements Tool { 24 25 26 public static void main(String[] args) throws Exception { 27 InsertMR im = new InsertMR(); 28 im.run(args); 29 } 30 31 public int run(String[] strings) throws Exception { 32 String jobName = "insert data into hbase"; 33 String outputTable = "OutTable"; 34 String inputPath = "/usr/mapreduce/input"; 35 String outputPath = "usr/maprduce/output"; 36 Configuration conf = HBaseConfiguration.create(); 37 Job job = Job.getInstance(conf, jobName); 38 39 job.setJarByClass(InsertMR.class); 40 41 job.setMapperClass(InsertMap.class); 42 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 43 job.setMapOutputValueClass(Put.class); 44 45 job.setInputFormatClass(TextInputFormat.class);//hadoop 默认使用TextInputFormat 46 47 //设置输入输出路径 48 FileInputFormat.setInputPaths(job,new Path(inputPath)); 49 FileOutputFormat.setOutputPath(job,new Path(outputPath)); 50 51 52 TableMapReduceUtil.initTableReducerJob( 53 outputTable, 54 null, 55 job); 56 job.setNumReduceTasks(0); 57 return job.waitForCompletion(true) ? 0 : 1; 58 } 59 60 61 public class InsertMap extends Mapper<Writable, Text, ImmutableBytesWritable, Put> { 62 @Override 63 protected void map(Writable key, Text value, Context context) { 64 try { 65 66 String line = value.toString(); 67 String[] items = line.split(",", -1); 68 ImmutableBytesWritable outkey = new ImmutableBytesWritable(items[0].getBytes()); 69 String rk = items[0];//rowkey字段 70 Put put = new Put(rk.getBytes()); 71 put.add("f1".getBytes(), "c1".getBytes(), items[0].getBytes()); 72 put.add("f1".getBytes(), "c2".getBytes(), items[1].getBytes()); 73 context.write(outkey, put); 74 } catch (Exception e) { 75 76 77 } 78 } 79 80 } 81 82 83 }
这种方式最终会调用Tableoutputformat类,核心的原理仍是使用htable的put方法,不过因为使用了mapreduce分布式提交到hbase,速度比单线程效率高出许多,可是这种方式也不是万能的,put提交的熟读太快时会给hbase形成比较大的压力,容易发生gc形成节点挂掉,尤为是初始化表到hbase时,通常都会有不少的历史数据须要入库,容易形成比较大的压力,这种状况下建议使用下面的方式bulkload方式入库,减小给hbase压力。上面这种方式是直接在map中生成put而后交给TableOutputformat去提交的,由于这里几乎不须要逻辑处理,若是须要作逻辑处理,那么通常会在reduce端去生成put对象,在map端作业务逻辑处理,好比数据关联,汇总之类的
bulkload
若是在写入hbase的上述的方式仍是不能知足需求的话,就能够考虑使用bulkload的方式了。上述几种方式虽然实现的方式涉及到的东西不一样,可是本质是同样的,都是使用HTable对象调用put方法,而后HTable经过rpc提交到reginserver上,而后经过LSM过程以后最终写入到磁盘上。HBase的数据最终会变成hfile文件落到磁盘上,那么有没有一种方式能够绕过前面的这些过程,直接生成最终的hfile文件呢。确定是有的,bulkload写入hbase的原理正是基于此。使用mapreduce来生成hbase的hfile文件,而后将文件塞到hbase存储数据的目录下,这样作能够减小了海量的数据请求时间,也彻底避免了regionserver的处理数据的压力。因为涉及到hbase存储架构的原理,只大概讲一下过程,在map端生成put对象,reduce使用hbase提供的KeyValueSortReducer便可,reduce端会将数据按照rowkey作排序,生成hfile文件,而后按照region的分布对hfile作分割,将分割的hfile文件放到相应的region目录下,这里就不详细赘述,直接上代码
driver
1 package com.hbase.mapreudce.driver; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.hbase.HBaseConfiguration; 9 import org.apache.hadoop.hbase.KeyValue; 10 import org.apache.hadoop.hbase.TableNotFoundException; 11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 12 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; 13 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; 14 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; 15 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 16 import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.mapreduce.Job; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 21 import org.apache.hadoop.util.GenericOptionsParser; 22 import org.apache.hadoop.util.Tool; 23 import org.apache.hadoop.util.ToolRunner; 24 import org.apache.log4j.Logger; 25 26 import com.hbase.mapreudce.map.BaseBulkLoadBaseMapper; 27 import com.spdbccc.mapreduce.plus.util.ConnectUtil; 28 import com.spdbccc.mapreduce.plus.util.Util; 29 30 public class BulkLoadHFileDriver extends Configured implements Tool { 31 32 private static Logger logger = Logger.getLogger(BulkLoadHFileDriver.class); 33 34 private String jobname; 35 36 private Configuration conf; 37 38 public static void main(String[] args) throws Exception { 39 BulkLoadHFileDriver bld = new BulkLoadHFileDriver(); 40 bld.excute(args); 41 } 42 43 public void excute(String[] args) throws Exception { 44 int rtn = ToolRunner.run(new BulkLoadHFileDriver(), args); 45 this.dobulkLoadFile(conf); 46 47 } 48 49 public int run(String[] args) throws Exception { 50 this.conf = HBaseConfiguration.create(); 51 String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 52 53 // conf.get("", ""); 54 String tablename = conf.get("", ""); 55 String inputPathstr = conf.get("", ""); 56 String outputPathstr = conf.get("", ""); 57 58 Path outputPath = Util.getTempPath(conf, outputPathstr, true); 59 60 Job job = Job.getInstance(conf, "HFile bulk load test"); 61 job.setJarByClass(BulkLoadHFileDriver.class); 62 63 job.setMapperClass(BaseBulkLoadBaseMapper.class); 64 job.setReducerClass(KeyValueSortReducer.class); 65 66 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 67 job.setMapOutputValueClass(KeyValue.class); 68 69 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 70 71 FileInputFormat.addInputPath(job, new Path(inputPathstr)); 72 FileOutputFormat.setOutputPath(job, outputPath); 73 74 HFileOutputFormat2.configureIncrementalLoad(job, ConnectUtil.getHTable(conf, tablename)); 75 76 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 77 loader.doBulkLoad(new Path(dfsArgs[0]), ConnectUtil.getHTable(conf, tablename)); 78 79 return job.waitForCompletion(true) ? 0 : 1; 80 } 81 82 private void dobulkLoadFile(Configuration conf) throws Exception { 83 String tablename = conf.get("", ""); 84 String hfiledirpathstr = conf.get("", ""); 85 86 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 87 loader.doBulkLoad(new Path(hfiledirpathstr), ConnectUtil.getHTable(conf, tablename)); 88 89 } 90 91 }
map
1 package com.hbase.mapreudce.map; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.hbase.KeyValue; 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 7 import org.apache.hadoop.hbase.util.Bytes; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.io.Writable; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.log4j.Logger; 13 14 public class BaseBulkLoadBaseMapper extends 15 Mapper<Writable, Text, ImmutableBytesWritable, KeyValue> { 16 17 18 private static Logger logger = Logger.getLogger(BaseBulkLoadBaseMapper.class); 19 20 @Override 21 protected void map(Writable key, Text value, Context context) 22 throws IOException, InterruptedException { 23 String line = value.toString(); 24 String[] items = line.split(",", -1); 25 ImmutableBytesWritable rowkey = new ImmutableBytesWritable( 26 items[0].getBytes()); 27 28 KeyValue kv = new KeyValue(Bytes.toBytes(items[0]), 29 Bytes.toBytes(items[1]), Bytes.toBytes(items[2]), 30 System.currentTimeMillis(), Bytes.toBytes(items[3])); 31 if (null != kv) { 32 context.write(rowkey, kv); 33 } 34 35 36 37 } 47 }