[TOC]git
测试案例需求:在原表LJK_TEST上,将mycf:name做为二级索引。github
建立一张索引表服务器
create 'INDEX_LJK_TEST','mycf'
app
写代码ide
public class SecondIndexObserver extends BaseRegionObserver { private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST"; private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf"); private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); private static final byte[] COLUMN_ID = Bytes.toBytes("id"); private Configuration configuration = HBaseConfiguration.create(); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { HTable hTable = new HTable(configuration, INDEX_TABLE_NAME); List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME); for (Cell cell : cells) { Put indexPut = new Put(CellUtil.cloneValue(cell)); indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell)); hTable.put(indexPut); } } }
将jar包上传到HDFS,并给表LJK_TEST加上协处理器。oop
alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'
测试
测试!往原表增长数据,看是否二级索引表符合预期结果。ui
能够看到索引表对应增长了一条数据。this
hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3' 0 row(s) in 0.0930 seconds hbase(main):006:0> scan 'INDEX_LJK_TEST' ROW COLUMN+CELL LJK3 column=mycf:id, timestamp=1562055903019, value=003 1 row(s) in 0.0110 seconds
测试案例需求:在原表LJK_TEST上,将mycf:name做为二级索引。spa
写代码
public class MrIndexBuilder { static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { private String columnFamily; private String quality; private String indexTableName; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality)); for (Cell cell : columnCells) { byte[] indexRow = CellUtil.cloneValue(cell); Put put = new Put(indexRow); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get()); context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put); } } @Override protected void setup(Context context) { Configuration configuration = context.getConfiguration(); this.columnFamily = configuration.get("cf"); this.quality = configuration.get("qa"); this.indexTableName = configuration.get("indexTalbeName"); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); if (args.length < 4) { throw new RuntimeException("参数传入错误,须要4个参数,原表名,二级索引表名,原表的CF,原表做为二级索引的字段名!"); } String tableName = args[0]; String indexTalbeName = args[1]; String columnFamily = args[2]; String indexQualify = args[3]; conf.set("cf", columnFamily); conf.set("qa", indexQualify); conf.set("indexTalbeName", indexTalbeName); Job mrIndexBuilder = new Job(conf, "MrIndexBuilder"); mrIndexBuilder.setJarByClass(MrIndexBuilder.class); mrIndexBuilder.setMapperClass(MyMapper.class); mrIndexBuilder.setInputFormatClass(TableInputFormat.class); mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class); mrIndexBuilder.setNumReduceTasks(0); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, Put.class, mrIndexBuilder); boolean b = mrIndexBuilder.waitForCompletion(true); if (!b) { throw new IOException("任务报错!"); } } }
打成jar包,放到hbase集群环境的某一台服务器上。执行命令
HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST mycf name
验证结果符合预期
hbase(main):021:0> scan 'INDEX_LJK_TEST' ROW COLUMN+CELL LJK column=mycf:id, timestamp=1562657562219, value=002 LJK3 column=mycf:id, timestamp=1562657562219, value=003 LJK4 column=mycf:id, timestamp=1562657562219, value=004 LJK5 column=mycf:id, timestamp=1562657562219, value=005 LJK6 column=mycf:id, timestamp=1562657562219, value=006 LJK7 column=mycf:id, timestamp=1562657562219, value=007 LJK8 column=mycf:id, timestamp=1562657562219, value=008 7 row(s) in 0.3670 seconds
该方案最为简单,先创建一张映射到Phoenix的表,接着采用全局二级索引
CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)
CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);
该方案基本能够应付全部状况,待补充。