部分业务须要使用HBase的数据进行多维度分析,咱们采用了将部分数据同步到Solr,经过Solr进行多维度查询返回对应的Rowkey,再从HBase批量获取数据。所以咱们使用了一个比较成熟的方案Lily HBase Indexer来同步二级索引到Solr。可是使用的时候出现了Solr丢失数据的问题。基本上天天Solr都会比HBase少几千条数据。java
因为咱们使用的是CDH集群,下面全部操做都是基于该环境git
到每一个节点的/var/log/hbase-solr
和/var/log/solr
查看了日志,都没发现写入失败的记录github
因为日志没有发现错误,猜想是Solr的数据在缓存中没提交上去。
在solr的collecttion目录下的conf/solrconfig.xml文件,将Solr的硬提交激活,操做以下缓存
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:60000}</maxTime>
<openSearcher>true</openSearcher>
</autoCommit>
而后保存配置,将修改update 到Solr集群。而后测试仍旧出现上述问题app
目前是没看到问题出在哪里了,所以只能去网上搜索一下具体缘由了。网上有这么两个帖子
hbase-indexer solr numFound different from hbase table rows size异步
HBase Indexer致使Solr与HBase数据不一致问题解决性能
他们都提到了修改morphline-hbase-mapper.xml,添加read-row
以下:
测试
从新刷新hbase-indexer配置ui
此次发现数目对了,可是字段缺了spa
因为设置了read-row以后数据不会再次从HBase中获取,所以只会读取WAL。假如修改了部分字段,HBaseIndexer就会提交相应的字段上去。例如
HBase中有name和age两个字段
put 'HBase_Indexer_Test','001','cf1:name','xiaoming'
put 'HBase_Indexer_Test','002','cf1:name','xiaohua'
此时的数据为
而后执行
put 'HBase_Indexer_Test','001','cf1:age','12'
最后只能看到
说明这种模式只从WAL获取数据,而且将获取的数据覆盖到了Solr里面。
那么这样看来只能修改HBase indexer的代码了
Lily HBase Indexer的代码是托管在github 上的,若是是单独安装的请直接访问NGDATA的这个工程:http://ngdata.github.io/hbase-indexer/
若是是使用的CDH版本,请访问:https://github.com/cloudera/hbase-indexer
我这里使用CDH 5.7.0版本进行测试。在releases选项中能够找到对应版本号的包,下载解压以后能够看到一个Maven工程。能够看到它包含以下模块
在./hbase-indexer-engine/src/main/java/com/ngdata/hbaseindexer/indexer/Indexer.java
文件中有一个calculateIndexUpdates方法,其中有以下代码:
Result result = rowData.toResult();
if(conf.getRowReadMode()==RowReadMode.DYNAMIC){
if(!mapper.containsRequiredData(result)){
result = readRow(rowData);
}
}
boolean rowDeleted = result.isEmpty();
privateResult readRow(RowData rowData)throwsIOException{
TimerContext timerContext = rowReadTimer.time();
try{
HTableInterface table = tablePool.getTable(rowData.getTable());
try{
Getget= mapper.getGet(rowData.getRow());
return table.get(get);
}finally{
table.close();
}
}finally{
timerContext.stop();
}
}
从代码中能够看出其执行的流程图以下:
假如咱们使用默认的Dynamic模式写入了大量的数据,那么意味着有部分数据会在WAL生成后一段时间内没法“落地”,那么就可能出现下面的状况:
知道了问题在哪里以后,咱们尝试修改他的源码。因为HBase将预写日志的内容写到HBase region中会有必定的滞后性,所以咱们能够认为预写日志中的内容老是最新的数据。假设咱们有一条rowkey =001的数据以下:
列名 | 值 |
---|---|
Rowkey | 001 |
cf1:A | a |
cf1:B | b |
cf1:C | c |
咱们将C的值改为D。因为夹杂在不少条数据中,可能日志中拿到了C = 'd',可是HBase中仍旧是'c',咱们须要将HBase的数据拿出来,再将预写日志中的数据覆盖它,便有了下面的代码
privateResult readRow(RowData rowData)throwsIOException{
TimerContext timerContext = rowReadTimer.time();
try{
HTableInterface table = tablePool.getTable(rowData.getTable());
try{
Get get = mapper.getGet(rowData.getRow());
return merge(table.get(get), rowData.toResult());
//return table.get(get);
}finally{
table.close();
}
}finally{
timerContext.stop();
}
}
privateResult merge(Result data,Result wal)throwsIOException{
//若是data为空,则直接返回WAL的数据
if(data.isEmpty()){
return wal;
}
/* //若是rowkey不相同,则返回wal的数据
if (!Bytes.toString(data.getRow()).equals(Bytes.toString(wal.getRow()))) {
return wal;
}*/
TreeMap<String,Cell> cellMap =newTreeMap<String,Cell>();
CellScanner dataScanner = data.cellScanner();
CellScanner walScanner = wal.cellScanner();
while(dataScanner.advance()){
Cell cell = dataScanner.current();
String cf =Bytes.toString(CellUtil.cloneFamily(cell));
String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
String key = cf +"->"+ cq;
cellMap.put(key, cell);
}
while(walScanner.advance()){
Cell cell = walScanner.current();
String cf =Bytes.toString(CellUtil.cloneFamily(cell));
String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
String key = cf +"->"+ cq;
cellMap.put(key, cell);
}
ArrayList<Cell> cells =newArrayList<Cell>();
cells.addAll(cellMap.values());
returnResult.create(cells);
}
值得一提的是,HBase返回的result中,列的排序是按照"列族名+列名"的字典排序。好比表中有["cf1:name","cf2:cellphone","cf1:age"] 三个列,那么返回的时候会排列成["cf1:age","cf1:name","cf2:cellphone"]。在建立新的Result对象的时候也必须遵循这样的规则,所以这里使用了treemap。不要问我为何,我特么调了一成天才发现这个问题。
进入hbase-indexer-engine的工程,执行mvn clean install -DskipTests
进行打包,稍等片刻便好了
在target下面有一个hbase-indexer-engine-1.5-cdh5.7.0.jar文件(这里的版本号对应本身的环境),将这个jar文件分发到集群的hbase-indexer的目录下,CDH版本放在在/opt/cloudera/parcels/CDH/jars/下便可。
而后重启服务进行测试。
数据跑了一天,Solr中对应的条数和HBase的同样。
所以咱们修改的代码是有效的。
上面咱们是合并了数据而后所有覆盖到Solr的,若是HBase存在大量的Update操做,那么势必每次列数都会和映射到Solr里面的列不一致,所以每次都会从HBase中get一次数据,这样确定会影响性能。那么咱们可否使用ReadRow.Never模式 + Solr的原子更新
的方式来实现呢?