HBase协处理器同步二级索引到Solr(续)


1、 已知的问题和不足

    在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,可是仍旧有几个缺陷:java

  1. 写入Solr的Collection是写死在代码里面,且是惟一的。若是咱们有一张表的数据但愿将不一样的字段同步到Solr中该如何作呢?
  2. 目前全部配置相关信息都是写死到了代码中的,是否能够添加外部配置文件。
  3. 原来的方法是每次都须要编译新的Jar文件单独运行,可否将全部的同步使用一段通用的代码完成?

2、解决思路

针对上面的三个主要问题,咱们一一解决shell

  1. 一般一张表会对应多个SolrCollection以及不一样的Column。咱们可使用Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]这样的类型,根据表名获取全部的Collection和Column。
  2. 经过Typesafe Config读取外部配置文件,达到全部信息可配的目的。
  3. 全部的数据都只有Put和Delete,只要咱们拦截到具体的消息以后判断当前的表名,而后根据问题一中的Collection和Column便可写入对应的SolrServer。在协处理器中获取表名的是e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext

3、代码

3.1 读取config文件内容

使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>。具体代码以下缓存

 
 
 
 
 
public class ConfigManager { private static SourceConfig sourceConfig = new SourceConfig(); public static Config config; static { sourceConfig.setConfigFiles("morphlines.conf"); config = sourceConfig.getConfig(); } public static Map<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){ Map<String,List<HBaseIndexerMappin>> mappin = new HashMap<String, List<HBaseIndexerMappin>>(); Config mappinConf = config.getConfig("Mappin"); List<String> tables = mappinConf.getStringList("HBaseTables"); for (String table :tables){ List<Config> confList = (List<Config>) mappinConf.getConfigList(table); List<HBaseIndexerMappin> maps = new LinkedList<HBaseIndexerMappin>(); for(Config tmp :confList){ HBaseIndexerMappin map = new HBaseIndexerMappin(); map.solrConnetion = tmp.getString("SolrCollection"); map.columns = tmp.getStringList("Columns"); maps.add(map); } mappin.put(table,maps); } return mappin; }}

3.2 封装SolrServer的获取方式

由于目前我使用的环境是Solr和HBase公用的同一套Zookeeper,所以咱们彻底能够借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,天然能够经过HBase的Configuration获取当前的Zookeeper节点和端口,而后轻松的获取到Solr的地址。并发

 
 
 
 
 
public class SolrServerManager implements LogManager { static Configuration conf = HBaseConfiguration.create(); public static String ZKHost = conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2"); public static String ZKPort = conf.get("hbase.zookeeper.property.clientPort","2181"); public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr"; public static int zkClientTimeout = 1800000;// 心跳 public static int zkConnectTimeout = 1800000;// 链接时间 public static CloudSolrServer create(String defaultCollection){ log.info("Create SolrCloudeServer .This collection is " + defaultCollection); CloudSolrServer solrServer = new CloudSolrServer(SolrUrl); solrServer.setDefaultCollection(defaultCollection); solrServer.setZkClientTimeout(zkClientTimeout); solrServer.setZkConnectTimeout(zkConnectTimeout); return solrServer; }}

3.3 编写提交数据到Solr的代码

理想状态下,咱们时时刻刻都须要提交数据到Solr中,可是事实上咱们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。所以咱们必须保证在高并发下能达到必定数据量自动提交,在低并发的状况下能隔一段时间写入一次。只有两种机制并存的状况下才能保证数据能即时写入。app

 
 
 
 
 
public class SolrCommitTimer extends TimerTask implements LogManager { public Map<String,List<SolrInputDocument>> putCache = new HashMap<String, List<SolrInputDocument>>();//Collection名字->更新(插入)操做缓存 public Map<String,List<String>> deleteCache = new HashMap<String, List<String>>();//Collection名字->删除操做缓存 Map<String,CloudSolrServer> solrServers = new HashMap<String, CloudSolrServer>();//Collection名字->SolrServers int maxCache = ConfigManager.config.getInt("MaxCommitSize"); // 任什么时候候,保证只能有一个线程在提交索引,并清空集合 final static Semaphore semp = new Semaphore(1); //添加Collection和SolrServer public void addCollecttion(String collection,CloudSolrServer server){ this.solrServers.put(collection,server); }//往Solr添加(更新)数据 public UpdateResponse put(CloudSolrServer server,SolrInputDocument doc) throws IOException, SolrServerException { server.add(doc); return server.commit(false, false); }//往Solr添加(更新)数据 public UpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs) throws IOException, SolrServerException { server.add(docs); return server.commit(false, false); }//根据ID删除Solr数据 public UpdateResponse delete(CloudSolrServer server,String rowkey) throws IOException, SolrServerException { server.deleteById(rowkey); return server.commit(false, false); }//根据ID删除Solr数据 public UpdateResponse delete(CloudSolrServer server,List<String> rowkeys) throws IOException, SolrServerException { server.deleteById(rowkeys); return server.commit(false, false); }//将doc添加到缓存 public void addPutDocToCache(String collection, SolrInputDocument doc) throws IOException, SolrServerException, InterruptedException { semp.acquire(); log.debug("addPutDocToCache:" + "collection=" + collection + "data=" + doc.toString()); if(!putCache.containsKey(collection)){ List<SolrInputDocument> docs = new LinkedList<SolrInputDocument>(); docs.add(doc); putCache.put(collection,docs); }else { List<SolrInputDocument> cache = putCache.get(collection); cache.add(doc); if (cache.size() >= maxCache) { try { this.put(solrServers.get(collection), cache); } finally { putCache.get(collection).clear(); } } } semp.release();//释放信号量 }//添加删除操做到缓存 public void addDeleteIdCache(String collection,String rowkey) throws IOException, SolrServerException, InterruptedException { semp.acquire(); log.debug("addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey); if(!deleteCache.containsKey(collection)){ List<String> rowkeys = new LinkedList<String>(); rowkeys.add(rowkey); deleteCache.put(collection,rowkeys); }else{ List<String> cache = deleteCache.get(collection); cache.add(rowkey); if (cache.size() >= maxCache) { try{ this.delete(solrServers.get(collection),cache); }finally { putCache.get(collection).clear(); } } } semp.release();//释放信号量 } @Override public void run() { try { semp.acquire(); log.debug("开始插入...."); Set<String> collections = solrServers.keySet(); for(String collection:collections){ if(putCache.containsKey(collection) && (!putCache.get(collection).isEmpty()) ){ this.put(solrServers.get(collection),putCache.get(collection)); putCache.get(collection).clear(); } if(deleteCache.containsKey(collection) && (!deleteCache.get(collection).isEmpty())){ this.delete(solrServers.get(collection),deleteCache.get(collection)); deleteCache.get(collection).clear(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { log.error("Commit putCache to Solr error!Because :" + e.getMessage()); }finally { semp.release();//释放信号量 } }}

3.4 拦截HBase的Put和Delete操做信息

在每一个prePut和preDelete中拦截操做信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。ide

 
 
 
 
 
public class HBaseIndexerToSolrObserver extends BaseRegionObserver implements LogManager{ Map<String,List<HBaseIndexerMappin>> mappins = ConfigManager.getHBaseIndexerMappin(); Timer timer = new Timer(); int maxCommitTime = ConfigManager.config.getInt("MaxCommitTime"); //最大提交时间,s SolrCommitTimer solrCommit = new SolrCommitTimer(); public HBaseIndexerToSolrObserver(){ log.info("Initialization HBaseIndexerToSolrObserver ..."); for(Map.Entry<String,List<HBaseIndexerMappin>> entry : mappins.entrySet() ){ List<HBaseIndexerMappin> solrmappin = entry.getValue(); for(HBaseIndexerMappin map:solrmappin){ String collection = map.solrConnetion;//获取Collection名字 log.info("Create Solr Server connection .The collection is " + collection); CloudSolrServer solrserver = SolrServerManager.create(collection);//根据Collection初始化SolrServer链接 solrCommit.addCollecttion(collection,solrserver); } } timer.schedule(solrCommit, 10 * 1000L, maxCommitTime * 1000L); } @Override public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();//获取表名 String rowkey= Bytes.toString(put.getRow());//获取主键 SolrInputDocument doc = new SolrInputDocument(); List<HBaseIndexerMappin> mappin = mappins.get(table); for(HBaseIndexerMappin mapp : mappin){ for(String column : mapp.columns){ String[] tmp = column.split(":"); String cf = tmp[0]; String cq = tmp[1]; if(put.has(Bytes.toBytes(cf),Bytes.toBytes(cq))){ Cell cell = put.get(Bytes.toBytes(cf),Bytes.toBytes(cq)).get(0);//获取制定列的数据 Map<String, String > operation = new HashMap<String,String>(); operation.put("set",Bytes.toString(CellUtil.cloneValue(cell))); doc.setField(cq,operation);//使用原子更新的方式将HBase二级索引写入Solr } } doc.addField("id",rowkey); try { solrCommit.addPutDocToCache(mapp.solrConnetion,doc);//添加doc到缓存 } catch (SolrServerException e1) { e1.printStackTrace(); } catch (InterruptedException e1) { e1.printStackTrace(); } } } @Override public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException{ String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString(); String rowkey= Bytes.toString(delete.getRow()); List<HBaseIndexerMappin> mappin = mappins.get(table); for(HBaseIndexerMappin mapp : mappin){ try { solrCommit.addDeleteIdCache(mapp.solrConnetion,rowkey);//添加删除操做到缓存 } catch (SolrServerException e1) { e1.printStackTrace(); } catch (InterruptedException e1) { e1.printStackTrace(); } } }}

4、 使用

首先须要添加morphlines.conf文件。里面包含了须要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置以下:高并发

 
 
 
 
 
#最大提交时间(单位:秒)MaxCommitTime = 30#最大批次容量MaxCommitSize = 10000Mappin { HBaseTables: ["HBASE_OBSERVER_TEST"] #须要同步的HBase表名 "HBASE_OBSERVER_TEST": [ { SolrCollection: "bqjr" #Solr Collection名字 Columns: [ "cf1:test_age", #须要同步的列,格式<列族:列> "cf1:test_name" ] }, ]}

该配置文件默认放在各个节点的/etc/hbase/conf/下。若是你但愿将配置文件路径修改成其余路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。post

而后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。ui

 
 
 
 
 
#先禁用这张表disable 'HBASE_OBSERVER_TEST'#为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'#启用这张表enable 'HBASE_OBSERVER_TEST'#删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'

若是须要新增一张表同步到Solr。只须要修改morphlines.conf文件,分发倒各个节点。而后将协处理器添加到HBase表中,这样就不用再次修改代码了。this

相关文章
相关标签/搜索