使用Hbase协做器(Coprocessor)同步数据到ElasticSearch

使用Hbase协做器(Coprocessor)同步数据到ElasticSearch
最近项目中须要将Hbase中的数据同步到ElasticSearch中,需求就是只要往Hbase里面put或者delete数据,那么ES集群中,相应的索引下,也须要更新或者删除这条数据。本人使用了hbase-rirver插件,发现并无那么好用,因而到网上找了一些资料,本身整理研究了一下,就本身写了一个同步数据的组件,基于Hbase的协做器,效果还不错,如今共享给你们,若是你们发现什么须要优化或者改正的地方,能够在个人csdn博客:个人csdn博客地址上面私信我给我留言,代码托管在码云上Hbase-Observer-ElasticSearch。同时要感谢Gavin Zhang 2shou,我虽然不认识Gavin Zhang 2shou,(2shou的同步数据博文)可是我是看了他写的代码以及博客以后,(2shou的同步组件代码)在他的基础之上对代码作了部分优化以及调整,来知足我自己的需求,因此在此表示感谢,但愿我把个人代码开源出来,其余人看到以后也能激发大家的灵感,来写出更多更好更加实用的东西:java

Hbase协做器(Coprocessor)
编写组件
部署组件
验证组件
总结
Hbase协做器(Coprocessor)
HBase 0.92版本后推出了Coprocessor — 协处理器,一个工做在Master/RegionServer中的框架,能运行用户的代码,从而灵活地完成分布式数据处理的任务。node

HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器相似传统数据库中的存储过程,客户端能够调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最多见的用法就是进行汇集操做。若是没有协处理器,当用户须要找出一张表中的最大数据,即 max 聚合操做,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操做。这样的方法没法利用底层集群的并发能力,而将全部计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户能够将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操做。即在每一个 Region 范围内执行求最大值的代码,将每一个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样总体的执行效率就会提升不少。
另一种协处理器叫作 Observer Coprocessor,这种协处理器相似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。好比:put 操做以前有钩子函数 prePut,该函数在 put 操做执行前会被 Region Server 调用;在 put 操做以后则有 postPut 钩子函数。
在实际的应用场景中,第二种Observer Coprocessor应用起来会比较多一点,由于第二种方式比较灵活,能够针对某张表进行绑定,假如hbase有十张表,我只想绑定其中的5张表,另外五张不须要处理,就不绑定便可,下面我要介绍的也是第二种方式。
编写组件
首先编写一个ESClient客户端,用于连接访问的ES集群代码。shell

package org.eminem.hbase.observer;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.lang3.StringUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

/**
 * ES Cleint class
 */
public class ESClient {

    // ElasticSearch的集群名称
    public static String clusterName;
    // ElasticSearch的host
    public static String nodeHost;
    // ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)
    public static int nodePort;
    // ElasticSearch的索引名称
    public static String indexName;
    // ElasticSearch的类型名称
    public static String typeName;
    // ElasticSearch Client
    public static Client client;

    /**
     * get Es config
     *
     * @return
     */
    public static String getInfo() {
        List<String> fields = new ArrayList<String>();
        try {
            for (Field f : ESClient.class.getDeclaredFields()) {
                fields.add(f.getName() + "=" + f.get(null));
            }
        } catch (IllegalAccessException ex) {
            ex.printStackTrace();
        }
        return StringUtils.join(fields, ", ");
    }

    /**
     * init ES client
     */
    public static void initEsClient() {
        Settings settings = ImmutableSettings.settingsBuilder()
                .put("cluster.name", ESClient.clusterName).build();
        client = new TransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(
                        ESClient.nodeHost, ESClient.nodePort));
    }

    /**
     * Close ES client
     */
    public static void closeEsClient() {
        client.close();
    }
}


而后编写一个Class类,继承BaseRegionObserver,并复写其中的start()、stop()、postPut()、postDelete()、四个方法。这四个方法其实很好理解,分别表示协做器开始、协做器结束、put事件触发并将数据存入hbase以后咱们能够作一些事情,delete事件触发并将数据从hbase删除以后咱们能够作一些事情。咱们只要将初始化ES客户端的代码写在start中,在stop中关闭ES客户端以及定义好的Scheduled对象便可。两个触发事件分别bulk hbase中的数据到ES,就轻轻松松的搞定了。数据库

package org.eminem.hbase.observer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

/**
 * Hbase Sync data to Es Class
 */
public class HbaseDataSyncEsObserver extends BaseRegionObserver {

    private static final Log LOG = LogFactory.getLog(HbaseDataSyncEsObserver.class);


    /**
     * read es config from params
     * @param env
     */
    private static void readConfiguration(CoprocessorEnvironment env) {
        Configuration conf = env.getConfiguration();
        ESClient.clusterName = conf.get("es_cluster");
        ESClient.nodeHost = conf.get("es_host");
        ESClient.nodePort = conf.getInt("es_port", -1);
        ESClient.indexName = conf.get("es_index");
        ESClient.typeName = conf.get("es_type");
    }

    /**
     *  start
     * @param e
     * @throws IOException
     */
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        // read config
         readConfiguration(e);
         // init ES client
         ESClient.initEsClient();
        LOG.error("------observer init EsClient ------"+ESClient.getInfo());
    }

    /**
     * stop
     * @param e
     * @throws IOException
     */
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        // close es client
       ESClient.closeEsClient();
       // shutdown time task
       ElasticSearchBulkOperator.shutdownScheduEx();
    }

    /**
     * Called after the client stores a value
     * after data put to hbase then prepare update builder to bulk  ES
     *
     * @param e
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(put.getRow());
        try {
            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String, Object> infoJson = new HashMap<String, Object>();
            Map<String, Object> json = new HashMap<String, Object>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                }
            }
            // set hbase family to es
            infoJson.put("info", json);
            ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(ESClient.indexName, ESClient.typeName, indexId).setDocAsUpsert(true).setDoc(infoJson));
        } catch (Exception ex) {
            LOG.error("observer put  a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
        }
    }


    /**
     * Called after the client deletes a value.
     * after data delete from hbase then prepare delete builder to bulk  ES
     * @param e
     * @param delete
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(delete.getRow());
        try {
            ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(ESClient.indexName, ESClient.typeName, indexId));
        } catch (Exception ex) {
            LOG.error(ex);
            LOG.error("observer delete  a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());

        }
    }
}

这段代码中info节点是根据我这边自身的需求加的,你们能够结合自身需求,去掉这个info节点,直接将hbase中的字段写入到ES中去。咱们的需求须要把hbase的Family也要插入到ES中。apache

最后就是比较关键的bulk ES代码,结合2shou的代码,我本身写的这部分代码,没有使用Timer,而是使用了ScheduledExecutorService,至于为何不使用Timer,你们能够去百度上面搜索下这两个东东的区别,我在这里就不作过多的介绍了。在ElasticSearchBulkOperator这个类中,我使用ScheduledExecutorService周期性的执行一个任务,去判断缓冲池中,是否有须要bulk的数据,阀值是10000.每30秒执行一次,若是达到阀值,那么就会当即将缓冲池中的数据bulk到ES中,并清空缓冲池中的数据,等待下一次定时任务的执行。固然,初始化定时任务须要一个beeper响铃的线程,delay时间10秒。还有一个很重要的就是须要对bulk的过程进行加锁操做。json

package org.eminem.hbase.observer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Bulk hbase data to ElasticSearch Class
 */
public class ElasticSearchBulkOperator {

    private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);

    private static final int MAX_BULK_COUNT = 10000;

    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static final Lock commitLock = new ReentrantLock();

    private static ScheduledExecutorService scheduledExecutorService = null;

    static {
        // init es bulkRequestBuilder
        bulkRequestBuilder = ESClient.client.prepareBulk();
        bulkRequestBuilder.setRefresh(true);

        // init thread pool and set size 1
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // create beeper thread( it will be sync data to ES cluster)
        // use a commitLock to protected bulk es as thread-save
        final Runnable beeper = new Runnable() {
            public void run() {
                commitLock.lock();
                try {
                    bulkRequest(0);
                } catch (Exception ex) {
                    System.out.println(ex.getMessage());
                    LOG.error("Time Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
                } finally {
                    commitLock.unlock();
                }
            }
        };

        // set time bulk task
        // set beeper thread(10 second to delay first execution , 30 second period between successive executions)
        scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);

    }

    /**
     * shutdown time task immediately
     */
    public static void shutdownScheduEx() {
        if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
    }

    /**
     * bulk request when number of builders is grate then threshold
     *
     * @param threshold
     */
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkItemResponse.hasFailures()) {
                bulkRequestBuilder = ESClient.client.prepareBulk();
            }
        }
    }

    /**
     * add update builder to bulk
     * use commitLock to protected bulk as thread-save
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" update Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * add delete builder to bulk
     * use commitLock to protected bulk as thread-save
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" delete Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }
}


至此,代码已经所有完成了,接下来只须要咱们打包部署便可。并发

部署组件
使用maven打包框架

mvn clean package


使用shell命令上传到hdfselasticsearch

hadoop fs -put hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar /hbase_es 
hadoop fs -chmod -R 777 /hbase_es 

验证组件
hbase shellmaven

create 'test_record','info'

disable 'test_record'

alter 'test_record', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase_es/hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar|org.eminem.hbase.observer.HbaseDataSyncEsObserver|1001|es_cluster=zcits,es_type=zcestestrecord,es_index=zcestestrecord,es_port=9100,es_host=master'

enable 'test_record'

put 'test_record','test1','info:c1','value1'
deleteall 'test_record','test1'


绑定操做以前须要,在ES集群中创建好相应的索引如下是对绑定代码的解释: 
把Java项目打包为jar包,上传到HDFS的特定路径 
进入HBase Shell,disable你但愿加载的表 
经过alert 命令激活Observer 
coprocessor对应的格式以|分隔,依次为: 
- jar包的HDFS路径 
- Observer的主类 
- 优先级(通常不用改) 
- 参数(通常不用改) 
- 新安装的coprocessor会自动生成名称:coprocessor + $ + 序号(经过describe table_name可查看)

之后对jar包内容作了调整,须要从新打包并绑定新jar包,再绑定以前须要作目标表作解绑操做,加入目标表以前绑定了同步组件的话,如下是解绑的命令

hbase shell

disable 'test_record'
alter 'test_record', METHOD => 'table_att_unset',NAME => 'coprocessor$1'
enable 'test_record'
desc 'test_record'


总结
绑定以后若是在执行的过程当中有报错或者同步不过去,能够到hbase的从节点上的logs目录下,查看hbase-roor-regionserver-slave*.log文件。由于协做器是部署在regionserver上的,因此要到从节点上面去看日志,而不是master节点。

hbase-river插件以前下载了源代码看了下,hbase-river插件是周期性的scan整张表进行bulk操做,而咱们这里本身写的这个组件呢,是基于hbase的触发事件来进行的,二者的效果和性能不言而喻,一个是全量的,一个是增量的,咱们在实际的开发中,确定是但愿若是有数据更新了或者删除了,咱们只要对着部分数据进行同步就好了,没有修改或者删除的数据,咱们能够不用去理会。

Timer和 ScheduledExecutorService,在这里我选择了ScheduledExecutorService,2shou以前提到过部署插件有个坑,修改Java代码后,上传到HDFS的jar包文件必须和以前不同,不然就算卸载掉原有的coprocessor再从新安装也不能生效,这个坑我也碰到了,就是由于没有复写stop方法,将定时任务停掉,线程一直会挂在那里,并且一旦报错将会致使hbase没法启动,必需要kill掉相应的线程。这个坑,坑了我一段时间,你们千万要注意,必定记得要复写stop方法,关闭以前打开的线程或者客户端,这样才是最好的方式。

相关文章
相关标签/搜索