HBase 二级索引与Coprocessor协处理器

 

Coprocessor简介java

(1)实现目的
数据库

  1. HBase没法轻易创建“二级索引”;
  2. 执行求和、计数、排序等操做比较困难,必须经过MapReduce/Spark实现,对于简单的统计或聚合计算时,可能会由于网络与IO开销大而带来性能问题。

(2)灵感来源

         灵感来源于Bigtable的协处理器,包含以下特性:
apache

  1. 每一个表服务器的任意子表均可以运行代码;
  2. 客户端可以直接访问数据表的行,多行读写会自动分片成多个并行的RPC调用。

(3)提供接口
json

  1. RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等;
  2. WALObserver:提供WAL相关操做钩子;
  3. MasterObserver:提供DDL-类型的操做钩子。如建立、删除、修改数据表等;
  4. Endpoint:终端是动态RPC插件的接口,它的实现代码被安装在服务器端,可以经过HBase RPC调用唤醒。

(4)应用范围
服务器

  1. 经过使用RegionObserver接口能够实现二级索引的建立和维护;
  2. 经过使用Endpoint接口,在对数据进行简单排序和sum,count等统计操做时,可以极大提升性能。

 

Endpoint服务端实现
网络

  在传统关系型数据库里面,能够随时的对某列进行求和sum,可是目前HBase目前所提供的接口,直接求和是比较困难的,因此先编写好服务端代码,并加载到对应的Table上,加载协处理器有几种方法,能够经过HTableDescriptor的addCoprocessor方法直接加载,同理也能够经过removeCoprocessor方法卸载协处理器。

  Endpoint协处理器相似传统数据库的存储过程,客户端调用Endpoint协处理器执行一段Server端代码,并将Server端代码的结果返回给Client进一步处理,最多见的用法就是进行聚合操做。举个例子说明:若是没有协处理器,当用户须要找出一张表中的最大数据即max聚合操做,必须进行全表扫描,客户端代码遍历扫描结果并执行求max操做,这样的方法没法利用底层集群的并发能力,而将全部计算都集中到Client端统一执行, 效率很是低。可是使用Coprocessor,用户将求max的代码部署到HBase Server端,HBase将利用底层Cluster的多个节点并行执行求max的操做即在每一个Region范围内执行求最大值逻辑,将每一个Region的最大值在Region Server端计算出,仅仅将该max值返回给客户端。客户端进一步将多个Region的max进一步处理而找到其中的max,这样总体执行效率提升不少。可是必定要注意的是Coprocessor必定要写正确,不然致使RegionServer宕机。架构

 

Protobuf定义并发

 如前所述,客户端和服务端之间须要进行RPC通讯,因此二者间须要肯定接口,当前版本的HBase的协处理器是经过Google Protobuf协议来实现数据交换的,因此须要经过Protobuf来定义接口。
以下所示:框架

option java_package = "com.my.hbase.protobuf.generated";
option java_outer_classname = "AggregateProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "Client.proto";

message AggregateRequest {
    required string interpreter_class_name = 1;
    required Scan scan = 2;
    optional bytes  interpreter_specific_bytes = 3;
}

message AggregateResponse {
    repeated bytes first_part = 1;
    optional bytes second_part = 2;
}

service AggregateService {
    rpc GetMax (AggregateRequest) returns (AggregateResponse);
    rpc GetMin (AggregateRequest) returns (AggregateResponse);
    rpc GetSum (AggregateRequest) returns (AggregateResponse);
    rpc GetRowNum (AggregateRequest) returns (AggregateResponse);
    rpc GetAvg (AggregateRequest) returns (AggregateResponse);
    rpc GetStd (AggregateRequest) returns (AggregateResponse);
    rpc GetMedian (AggregateRequest) returns (AggregateResponse);
}

  能够看到这里定义7个聚合服务RPC,名字分别叫作GetMax、GetMin、GetSum等,本文经过GetSum进行举例,其余的聚合RPC也是相似的内部实现。RPC有一个入口参数,用消息AggregateRequest表示;RPC的返回值用消息AggregateResponse表示。Service是一个抽象概念,RPC的Server端能够看做一个用来提供服务的Service。在HBase Coprocessor中Service就是Server端须要提供的Endpoint Coprocessor服务,主要用来给HBase的Client提供服务。AggregateService.java是由Protobuf软件经过终端命令“protoc filename.proto--java_out=OUT_DIR”自动生成的,其做用是将.proto文件定义的消息结构以及服务转换成对应接口的RPC实现,其中包括如何构建request消息和response响应以及消息包含的内容的处理方式,而且将AggregateService包装成一个抽象类,具体的服务以类的方法的形式提供。AggregateService.java定义Client端与Server端通讯的协议,代码中包含请求信息结构AggregateRequest、响应信息结构AggregateResponse、提供的服务种类AggregateService,其中AggregateRequest中的interpreter_class_name指的是column interpreter的类名,此类的做用在于将数据格式从存储类型解析成所需类型。ide

 

服务端的架构

  首先,Endpoint Coprocessor是一个Protobuf Service的实现,所以须要它必须继承某个ProtobufService。咱们在前面已经经过proto文件定义Service,命名为AggregateService,所以Server端代码须要重载该类,其次做为HBase的协处理器,Endpoint 还必须实现HBase定义的协处理器协议,用Java的接口来定义。具体来讲就是CoprocessorService和Coprocessor,这些HBase接口负责将协处理器和HBase 的RegionServer等实例联系起来以便协同工做。Coprocessor接口定义两个接口函数:start和stop。
  加载Coprocessor以后Region打开的时候被RegionServer自动加载,并会调用器start 接口完成初始化工做。通常状况该接口函数仅仅须要将协处理器的运行上下文环境变量CoprocessorEnvironment保存到本地便可。
  CoprocessorEnvironment保存协处理器的运行环境,每一个协处理器都是在一个RegionServer进程内运行并隶属于某个Region。经过该变量获取Region的实例等 HBase运行时环境对象。
  Coprocessor接口还定义stop()接口函数,该函数在Region被关闭时调用,用来进行协处理器的清理工做。本文里咱们没有进行任何清理工做,所以该函数什么也不干。
  咱们的协处理器还须要实现CoprocessorService接口。该接口仅仅定义一个接口函数 getService()。咱们仅须要将本实例返回便可。HBase的Region Server在接收到客户端的调用请求时,将调用该接口获取实现RPCService的实例,所以本函数通常状况下就是返回自身实例便可。
  完成以上三个接口函数以后,Endpoint的框架代码就已完成。每一个Endpoint协处理器都必须实现这些框架代码并且写法雷同。

  Server端的代码就是一个Protobuf RPC的Service实现,即经过Protobuf提供的某种服务。其开发内容主要包括:

  1. 实现Coprocessor的基本框架代码
  2. 实现服务的RPC具体代码


Endpoint 协处理的基本框架

Endpoint 是一个Server端Service的具体实现,其实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和HBase运行时环境协同工做而必须遵循和完成的一些粘合代码。所以多数状况下仅仅须要从一个例子程序拷贝过来并进行命名修改便可。不过咱们仍是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。

public Service getService() {
     return this;
}

public void start(CoprocessorEnvironment env) throws IOException {
     if(env instanceof RegionCoprocessorEnvironment) {
         this.env = (RegionCoprocessorEnvironment)env;
     } else {
         throw new CoprocessorException("Must be loaded on a table region!");
     }
}

public void stop(CoprocessorEnvironment env) throws IOException {
}

Endpoint协处理器真正的业务代码都在每个RPC函数的具体实现中。
在本文中,咱们的Endpoint协处理器仅提供一个RPC函数即getSUM。我将分别介绍编写该函数的几个主要工做:了解函数的定义,参数列表;处理入口参数;实现业务逻辑;设置返回参数。

public void getSum(RpcController controller, AggregateRequest request, RpcCallbackdone) {
        AggregateResponse response = null;
        RegionScanner scanner = null;
        long sum = 0L;
        try {
            ColumnInterpreter ignored = this.constructColumnInterpreterFromRequest(request);
            Object sumVal = null;
            Scan scan = ProtobufUtil.toScan(request.getScan());
            scanner = this.env.getRegion().getScanner(scan);
            byte[] colFamily = scan.getFamilies()[0];
            NavigableSet qualifiers = (NavigableSet) scan.getFamilyMap().get(colFamily);
            byte[] qualifier = null;
            if (qualifiers != null && !qualifiers.isEmpty()) {
                qualifier = (byte[]) qualifiers.pollFirst();
            }

            ArrayList results = new ArrayList();
            boolean hasMoreRows = false;

            do {
                hasMoreRows = scanner.next(results);
                int listSize = results.size();

                for (int i = 0; i < listSize; ++i) {
                    //取出列值
                    Object temp = ignored.getValue(colFamily, qualifier,
                            (Cell) results.get(i));
                    if (temp != null) {
                        sumVal = ignored.add(sumVal, ignored.castToReturnType(temp));
                    }
                }

                results.clear();
            } while (hasMoreRows);

            if (sumVal != null) {
                response = AggregateResponse.newBuilder().addFirstPart(
                        ignored.getProtoForPromotedType(sumVal).toByteString()).build();
            }
        } catch (IOException var27) {
            ResponseConverter.setControllerException(controller, var27);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException var26) {
                    ;
                }
            }

        }

        log.debug("Sum from this region is " +
                this.env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
        done.run(response);
    }

Endpoint类比于数据库的存储过程,其触发服务端的基于Region的同步运行再将各个结果在客户端搜集后归并计算。特色相似于传统的MapReduce框架,服务端Map客户端Reduce。

 

Endpoint客户端实现

HBase提供客户端Java包org.apache.hadoop.hbase.client.HTable,提供如下三种方法来调用协处理器提供的服务:

  1. coprocessorService(byte[])
  2. coprocessorService(Class, byte[], byte[],Batch.Call),
  3. coprocessorService(Class, byte[], byte[],Batch.Call, Batch.Callback)

  该方法采用rowkey指定Region。这是由于HBase客户端不多会直接操做Region,通常不须要知道Region的名字;何况在HBase中Region名会随时改变,因此用rowkey来指定Region是最合理的方式。使用rowkey能够指定惟一的一个Region,若是给定的Rowkey并不存在,只要在某个Region的rowkey范围内依然用来指定该Region。好比Region 1处理[row1, row100]这个区间内的数据,则rowkey=row1就由Region 1来负责处理,换句话说咱们能够用row1来指定Region 1,不管rowkey等于”row1”的记录是否存在。CoprocessorService方法返回类型为CoprocessorRpcChannel的对象,该 RPC通道链接到由rowkey指定的Region上面,经过此通道能够调用该Region上面部署的协处理器RPC。

  有时候客户端须要调用多个 Region上的同一个协处理器,好比须要统计整个Table的sum,在这种状况下,须要全部的Region都参与进来,分别统计自身Region内部的sum并返回客户端,最终客户端将全部Region的返回结果汇总,就能够获得整张表的sum。

  这意味着该客户端同时和多个Region进行批处理交互。一个可行的方法是,收集每一个 Region的startkey,而后循环调用第一种coprocessorService方法:用每个Region的startkey 做为入口参数,得到RPC通道建立stub对象,进而逐一调用每一个Region上的协处理器RPC。这种作法须要写不少的代码,为此HBase提供两种更加简单的 coprocessorService方法来处理多个Region的协处理器调用。先来看第一种方法 coprocessorService(Class, byte[],byte[],Batch.Call)

  该方法有 4 个入口参数。第一个参数是实现RPC的Service 类,即前文中的AggregateService类。经过它,HBase就能够找到相应的部署在Region上的协处理器,一个Region上能够部署多个协处理器,客户端必须经过指定Service类来区分究竟须要调用哪一个协处理器提供的服务。
  要调用哪些Region上的服务则由startkey和endkey来肯定,经过 rowkey范围便可肯定多个 Region。为此,coprocessorService方法的第二个和第三个参数分别是 startkey和endkey,凡是落在[startkey,endkey]区间内的Region都会参与本次调用。
  第四个参数是接口类Batch.Call。它定义了如何调用协处理器,用户经过重载该接口的call()方法来实现客户端的逻辑。在call()方法内,能够调用RPC,并对返回值进行任意处理。即前文代码清单1中所作的事情。coprocessorService将负责对每一个 Region调用这个call()方法。
  coprocessorService方法的返回值是一个Map类型的集合。该集合的key是Region名字,value是Batch.Call.call方法的返回值。该集合能够看做是全部Region的协处理器 RPC 返回的结果集。客户端代码能够遍历该集合对全部的结果进行汇总处理。
  这种coprocessorService方法的大致工做流程以下。首先它分析startkey和 endkey,找到该区间内的全部Region,假设存放在regionList中。而后,遍历regionList,为每个Region调用Batch.Call,在该接口内,用户定义具体的RPC调用逻辑。最后coprocessorService将全部Batch.Call.call()的返回值加入结果集合并返回。

  coprocessorService的第三种方法比第二个方法多了一个参数callback。coprocessorService第二个方法内部使用HBase自带的缺省callback,该缺省 callback将每一个Region的返回结果都添加到一个Map类型的结果集中,并将该集合做为coprocessorService方法的返回值。

  HBase 提供第三种coprocessorService方法容许用户定义callback行为,coprocessorService 会为每个RPC返回结果调用该callback,用户能够在callback 中执行须要的逻辑,好比执行sum累加。用第二种方法的状况下,每一个Region协处理器RPC的返回结果先放入一个列表,全部的 Region 都返回后,用户代码再从该列表中取出每个结果进行累加;用第三种方法,直接在callback中进行累加,省掉了建立结果集合和遍历该集合的开销,效率会更高一些。
  所以咱们只须要额外定义一个callback便可,callback是一个Batch.Callback接口类,用户须要重载其update方法。

public S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,final Scan scan)throws Throwable {

final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);

class SumCallBack implements Batch.Callback  {

S sumVal = null;

public S getSumResult() {
  return sumVal;
}

@Override
public synchronized void update(byte[] region, byte[] row, S result) {
  sumVal = ci.add(sumVal, result);
}}

SumCallBack sumCallBack = new SumCallBack();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
      new Batch.Call<AggregateService, S>() {
        @Override
        public S call(AggregateService instance) throws IOException {
          ServerRpcController controller = new ServerRpcController();
          BlockingRpcCallback<AggregateResponse> rpcCallback =
                  new BlockingRpcCallback<AggregateResponse>();
            //RPC 调用
          instance.getSum(controller, requestArg, rpcCallback);
          AggregateResponse response = rpcCallback.get();
          if (controller.failedOnException()) {
            throw controller.getFailedOn();
          }
          if (response.getFirstPartCount() == 0) {
            return null;
          }
          ByteString b = response.getFirstPart(0);
          T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
          S s = ci.getPromotedValueFromProto(t);
          return s;
        }
      }, sumCallBack);
 return sumCallBack.getSumResult();

Observer实现二级索引

Observer相似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor是一些散布在HBase Server端代码的 hook钩子, 在固定的事件发生时被调用。好比:put操做以前有钩子函数prePut,该函数在pu 操做执 行前会被Region Server调用;在put操做以后则有postPut 钩子函数。

RegionObserver工做原理
RegionObserver提供客户端的数据操纵事件钩子,Get、Put、Delete、Scan,使用此功能可以解决主表以及多个索引表之间数据一致性的问题

 

  1. 客户端发出put请求;
  2. 该请求被分派给合适的RegionServer和Region;
  3. coprocessorHost拦截该请求,而后在该表上登记的每一个 RegionObserver 上调用prePut();
  4. 若是没有被preGet()拦截,该请求继续送到 region,而后进行处理;
  5. Region产生的结果再次被CoprocessorHost拦截,调用postGet();
  6. 假如没有postGet()拦截该响应,最终结果被返回给客户端;

  如上图所示,HBase能够根据rowkey很快的检索到数据,可是若是根据column检索数据,首先要根据rowkey减少范围,再经过列过滤器去过滤出数据,若是使用二级索引,能够先查基于column的索引表,获取到rowkey后再快速的检索到数据。

  如图所示首先继承BaseRegionObserver类,重写postPut,postDelete方法,在postPut方法体内中写Put索引表数据的代码,在postDelete方法里面写Delete索引表数据,这样能够保持数据的一致性。
  在Scan表的时候首先判断是否先查索引表,若是不查索引直接scan主表,若是走索引表经过索引表获取主表的rowkey再去查主表。
  使用Elastic Search创建二级索引也是同样。
  咱们在同一个主机集群上同时创建了HBase集群和Elastic Search集群,存储到HBase的数据必须实时地同步到Elastic Search。而刚好HBase和Elastic Search都没有更新的概念,咱们的需求能够简化为两步:

  1. 当一个新的Put操做产生时,将Put数据转化为json,索引到ElasticSearch,并把RowKey做为新文档的ID;
  2. 当一个新的Delete操做产生时获取Delete数据的rowkey删除Elastic Search中对应的ID。

协处理的主要应用场景 

  1. Observer容许集群在正常的客户端操做过程当中能够有不一样的行为表现;
  2. Endpoint容许扩展集群的能力,对客户端应用开放新的运算命令;
  3. Observer相似于RDBMS的触发器,主要在服务端工做;
  4. Endpoint相似于RDBMS的存储过程,主要在服务端工做;
  5. Observer能够实现权限管理、优先级设置、监控、ddl控制、二级索引等功能;
  6. Endpoint能够实现min、max、avg、sum、distinct、group by等功能。
相关文章
相关标签/搜索