在HBase中,表格的Rowkey按照字典排序,Region按照RowKey设置split point进行shard,经过这种方式实现的全局、分布式索引,成为了其成功的最大的砝码html
每个索引创建一个表,而后依靠表的row key来实现范围检索。row key在HBase中是以B+ tree结构化有序存储的,因此scan起来会比较效率。
单表以row key存储索引,column value存储id值或其余数据 ,这就是Hbase索引表的结构。java
Hbase QualifierFilter用于过滤qualifier,也就是一个列族里面data:xxx,冒号后面的字符串git
大数据最好从rowkey入手,ColumnValueFilter的数度是很慢的,hbase查询速度仍是要依靠rowkey,因此根据业务逻辑把rowkey设计好,以后全部的查询都经过rowkey,是会很是快。 批量查询最好是用 scan的startkey endkey来作查询条件github
rowkey是hbase中很重要的一个设计,若是你把它当成普通字段那你的设计就有点失败了。它的设计能够说是一门艺术。你的查询若是不能把rowkey加入进来,那你的设计基本是失败的。加上rowkey,hbase能够快速地定位到具体的region去取你要的数据,不然就会满上遍野的找数据。apache
设计原则:缓存
1. 长度越短越好安全
Rowkey是一个二进制码流,Rowkey的长度被不少开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。负载均衡
缘由以下:分布式
(1)数据的持久化文件HFile中是按照KeyValue存储的,若是Rowkey过长好比100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;ide
(2)MemStore将缓存部分数据到内存,若是Rowkey字段过长内存的有效利用率会下降,系统将没法缓存更多的数据,这会下降检索效率。所以Rowkey的字节长度越短越好。
(3)目前操做系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操做系统的最佳特性。
2. 散列原则:若是Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位做为散列字段,由程序循环生成,低位放时间字段,这样将提升数据均衡分布在每一个Regionserver实现负载均衡的概率。若是没有散列字段,首字段直接是时间信息将产生全部新数据都在一个 RegionServer上堆积的热点现象,这样在作数据检索的时候负载将会集中在个别RegionServer,下降查询效率。
3. 惟一性
HBase按指定的条件获取一批记录时,使用的就是scan方法。 scan方法有如下特色:
(1)scan能够经过setCaching与setBatch方法提升速度(以空间换时间);
(2)scan能够经过setStartRow与setEndRow来限定范围。范围越小,性能越高。
经过巧妙的RowKey设计使咱们批量获取记录集合中的元素挨在一块儿(应该在同一个Region下),能够在遍历结果时得到很好的性能。
(3)scan能够经过setFilter方法添加过滤器,这也是分页、多条件查询的基础。
设计RowKey时能够这样作:采用 UserID + CreateTime + FileID组成RowKey。
须要注意如下几点:
(1)每条记录的RowKey,每一个字段都须要填充到相同长度。假如预期咱们最多有10万量级的用户,则userID应该统一填充至6位,如000001,000002…
(2)结尾添加全局惟一的FileID的用意也是使每一个文件对应的记录全局惟一。避免当UserID与CreateTime相同时的两个不一样文件记录相互覆盖。
RowKey存储上述文件记录,在HBase表中是下面的结构:
rowKey(userID 6 + time 8 + fileID 6) name category ….
00000120120902000001
应用实例
//时间范围的查找, 好比是2012-12-12到2013-01-23日之间的数据 FilterList filter = new FilterList(); if (timeFrom != null) { String sDate = String.valueOf(timeFrom.getTime()); SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(String.valueOf(sDate))); filter.addFilter(scvf); } if (timeTo != null) { String sDate = String.valueOf(timeTo.getTime()); SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("CF"), Bytes.toBytes("Date"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(String.valueOf(sDate))); filter.addFilter(scvf); }
HBase在0.92以后引入了coprocessors,提供了一系列的钩子,让咱们可以轻易实现访问控制和二级索引的特性。下面简单介绍下两种coprocessors,第一种是Observers,它实际相似于触发器,第二种是Endpoint,它相似与存储过程。因为这里只用到了Observers,因此只介绍Observers,想要更详细的介绍请查阅(https://blogs.apache.org/hbase/entry/coprocessor_introduction)。observers分为三种:
RegionObserver:提供数据操做事件钩子;
WALObserver:提供WAL(write ahead log)相关操做事件钩子;
MasterObserver:提供DDL操做事件钩子。
在二级索引的实现技术上通常有几个方案:
1. 表索引
使用单独的hbase表存储索引数据,业务表的索引列值作为索引表的rowkey,业务表的rowkey作为索引表的qualifier或value。
问题:对数据更新性能影响较大;没法保证一致性;Client查询须要2次RPC(先索引表再数据表)。
2. 列索引
与业务表使用相同表,使用单独列族存储索引,用户数据列值作为索引列族的Qualifier,用户数据Qualifier作为索引列族的列值。适用于单行有上百万Qualifier的数据模型,如网盘应用中网盘ID作为rowkey,网盘的目录元数据都存储在一个hbase row内。(facebook消息模型也是此方案)
可保证事务性
为了实现像SQL同样检索数据,select * from table where col=val。针对HBase Secondary Indexing的方案,成为HBase新版本(0.96)呼声最高的一项Feature。
粗略分析了当前的技术,大概的方案能够总结为这样两类:
一、使用HBase的coprocessor。CoProcessor至关于HBase的Observer+hook,目前支持MasterObserver、RegionObserver和WALObserver,基本上对于HBase Table的管理、数据的Put、Delete、Get等操做均可以找到对应的pre***和post***。这样若是须要对于某一项Column创建Secondary Indexing,就能够在Put、Delete的时候,将其信息更新到另一张索引表中。如图二所示,对于Indexing里面的value值是否存储的问题,能够根据须要进行控制,若是value的空间开销不大,逆向的检索又比较频繁,能够直接存储在Indexing Table中,反之则避免这种状况。
图2 使用HBase Coprocessor实现Secondary Indexing
二、由客户端发起对于主表和索引表的Put、Delete操做的双重操做。源自:http://hadoop-hbase.blogspot.com/2012/10/musings-on-secondary-indexes.html 【墙外】
它具体的作法总结起来有:
虽然在这种方案里没法保证原子性和一致性,可是经过TimeStamp的设置,No Locks和 No Server-side codes,使其在二级索引上有着较大的优点。至于中间出错的环节,咱们看看是否能够容忍:
1)Put索引表成功,Put主表失败。因为Indexing Table不存储val值,仍须要跳转到Main Table,因此这样的错误至关于拿一个Stale index去访问对应Rowkey吧了,对结果正确性没有影响。
2)Delete主表成功,Delete索引表失败。都是索引表的内容>=主表的内容而已,而实际返回值须要经过主表进行。
应用场景:
一、主表服务在线业务,它的性能须要保证。使用coprocessor和客户端的封装也好,都会影响其性能,因此在正常状况下,直接操做都不太合适。若是想使用方案二,我却是感受,能够调整Indexing Table的操做方式,去除保证其安全性的内容,好比能够关闭写HLOG,这样会进一步减低其操做的延迟。
二、离线更新索引表。在真正须要二级索引的场景内,其时效性要求每每不高。能够将索引实时更新到Redis等KV系统中,定时从KV更新索引到Hbase的Indexing Table中。PS:Redis里面有DB设置的概念,能够按照时间段进行隔离,这样某段时间内的数据会更新到Redis上,保证Redis导入MapReduce以后仍然能够进行update操做。
coprocessor代码实现 ??
We have been working on implementing secondary index in HBase and open sourced on hbase 0.94.8 version. The project is available on github. https://github.com/Huawei-Hadoop/hindex This Jira is to support secondary index on trunk(0.98). Following features will be supported. multiple indexes on table, multi column index, index based on part of a column value, equals and range condition scans using index, and bulk loading data to indexed table (Indexing done with bulk load) Most of the kernel changes needed for secondary index is available in trunk. Very minimal changes needed for it.
首先在HBase-0.19.3中必须设置参数,使得Hbase可使用索引,修改$HBASE_INSTALL_DIR/conf/hbase-site.xml:
<property> <name>hbase.regionserver.class</name> <value>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</value> </property> <property> <name>hbase.regionserver.impl</name> <value> org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer </value> </property>
(1)建立表时,增长二级索引:
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); HTableDescriptor desc = new HTableDescriptor("test_table"); desc.addFamily(new HColumnDescriptor("columnfamily1:")); desc.addFamily(new HColumnDescriptor("columnfamily2:")); desc.addIndex(new IndexSpecification("column1", Bytes.toBytes("columnfamily1:column1"))); desc.addIndex(new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.createTable(desc);
(2)在已经存在的表中,增长索引
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.addIndex(Bytes.toBytes("test_table"), new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2")));
(3)删除存在的索引
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.removeIndex(Bytes.toBytes("test_table"), "column2");
(4)经过索引scan全部数据
HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); // You need to specify which columns to get Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, null, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { String value1 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()); String value2 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue()); System.out.println(value1 + ", " + value2); } table.close();
(5)经过索引scan一部分子集,经过ColumnValueFilter过滤。
使用SingleColumnValueFilter会影响查询性能,在真正处理海量数据时会消耗很大的资源,且须要较长的时间
ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"), CompareOp.LESS, Bytes.toBytes("value1-10")); scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, filter, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2")); for (RowResult rowResult : scanner) { String value1 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()); String value2 = new String( rowResult.get(Bytes.toBytes("columnfamily1:column2")).getValue()); System.out.println(value1 + ", " + value2); }
通常不建议用Filter,scan.setFilters(),经过filter设置的条件查不到数据时,响应速度很是慢,大概在十几秒,有时会超时,
但能够查到数据时,响应速度只有几百ms,差距很是大
Scan scan = new Scan(); FilterList filters = new FilterList(); for (String[] param : params) { //param[0]为列名,param[1]为相应的值 filters.addFilter(new SingleColumnValueFilter("INFO".getBytes(), param[0].getBytes(), CompareOp.EQUAL, param[1].getBytes())); } scan.setFilter(filters);
(6)一个彻底的例子
import java.io.IOException; import java.util.Date; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin; import org.apache.hadoop.hbase.filter.ColumnValueFilter; import org.apache.hadoop.hbase.filter.ColumnValueFilter.CompareOp; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; public class SecondaryIndexTest { public void writeToTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); String row = "test_row"; BatchUpdate update = null; for (int i = 0; i < 100; i++) { update = new BatchUpdate(row + i); update.put("columnfamily1:column1", Bytes.toBytes("value1-" + i)); update.put("columnfamily1:column2", Bytes.toBytes("value2-" + i)); table.commit(update); } table.close(); } public void readAllRowsFromSecondaryIndex() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, null, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { System.out.println(Bytes.toString( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()) + ", " + Bytes.toString(rowResult.get( Bytes.toBytes("columnfamily1:column2")).getValue() )); } table.close(); } public void readFilteredRowsFromSecondaryIndex() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTable table = new IndexedTable(conf, Bytes.toBytes("test_table")); ColumnValueFilter filter = new ColumnValueFilter(Bytes.toBytes("columnfamily1:column1"), CompareOp.LESS, Bytes.toBytes("value1-40")); Scanner scanner = table.getIndexedScanner("column1", HConstants.EMPTY_START_ROW, null, filter, new byte[][] { Bytes.toBytes("columnfamily1:column1"), Bytes.toBytes("columnfamily1:column2") }); for (RowResult rowResult : scanner) { System.out.println(Bytes.toString( rowResult.get(Bytes.toBytes("columnfamily1:column1")).getValue()) + ", " + Bytes.toString(rowResult.get( Bytes.toBytes("columnfamily1:column2")).getValue() )); } table.close(); } public void createTableWithSecondaryIndexes() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); HTableDescriptor desc = new HTableDescriptor("test_table"); desc.addFamily(new HColumnDescriptor("columnfamily1:column1")); desc.addFamily(new HColumnDescriptor("columnfamily1:column2")); desc.addIndex(new IndexSpecification("column1", Bytes.toBytes("columnfamily1:column1"))); desc.addIndex(new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); if (admin.tableExists(Bytes.toBytes("test_table"))) { if (admin.isTableEnabled("test_table")) { admin.disableTable(Bytes.toBytes("test_table")); } admin.deleteTable(Bytes.toBytes("test_table")); } if (admin.tableExists(Bytes.toBytes("test_table-column1"))) { if (admin.isTableEnabled("test_table-column1")) { admin.disableTable(Bytes.toBytes("test_table-column1")); } admin.deleteTable(Bytes.toBytes("test_table-column1")); } admin.createTable(desc); } public void addSecondaryIndexToExistingTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.addIndex(Bytes.toBytes("test_table"), new IndexSpecification("column2", Bytes.toBytes("columnfamily1:column2"))); } public void removeSecondaryIndexToExistingTable() throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addResource(new Path("/opt/hbase-0.19.3/conf/hbase-site.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.removeIndex(Bytes.toBytes("test_table"), "column2"); } public static void main(String[] args) throws IOException { SecondaryIndexTest test = new SecondaryIndexTest(); test.createTableWithSecondaryIndexes(); test.writeToTable(); test.addSecondaryIndexToExistingTable(); test.removeSecondaryIndexToExistingTable(); test.readAllRowsFromSecondaryIndex(); test.readFilteredRowsFromSecondaryIndex(); System.out.println("Done!"); } }