1.原由(Why HBase Coprocessor)java
HBase做为列族数据库最常常被人诟病的特性包括:没法轻易创建“二级索引”,难以执行求和、计数、排序等操做。好比,在旧版本的(<0.92)Hbase中,统计数据表的总行数,须要使用Counter方法,执行一次MapReduce Job才能获得。虽然HBase在数据存储层中集成了MapReduce,可以有效用于数据表的分布式计算。然而在不少状况下,作一些简单的相加或者聚合计算的时候,若是直接将计算过程放置在server端,可以减小通信开销,从而得到很好的性能提高。因而,HBase在0.92以后引入了协处理器(coprocessors),实现一些激动人心的新特性:可以轻易创建二次索引、复杂过滤器(谓词下推)以及访问控制等。sql
2.灵感来源( Source of Inspration)数据库
HBase协处理器的灵感来自于Jeff Dean 09年的演讲( P66-67)。它根据该演讲实现了相似于bigtable的协处理器,包括如下特性:apache
每一个表服务器的任意子表均可以运行代码 编程
客户端的高层调用接口(客户端可以直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用) 服务器
提供一个很是灵活的、可用于创建分布式服务的数据模型 架构
可以自动化扩展、负载均衡、应用请求路由 并发
HBase的协处理器灵感来自bigtable,可是实现细节不尽相同。HBase创建了一个框架,它为用户提供类库和运行时环境,使得他们的代码可以在HBase region server和master上处理。负载均衡
3.细节剖析(Implementation)框架
协处理器分两种类型,系统协处理器能够全局导入region server上的全部数据表,表协处理器便是用户能够指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不一样方面的插件。一个是观察者(observer),相似于关系数据库的触发器。另外一个是终端(endpoint),动态的终端有点像存储过程。3.1观察者(Observer)
观察者的设计意图是容许用户经过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理全部的callback调用细节,协处理器自身只须要插入添加或者改变的功能。
以HBase0.92版本为例,它提供了三种观察者接口:
RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
WALObserver:提供WAL相关操做钩子。
MasterObserver:提供DDL-类型的操做钩子。如建立、删除、修改数据表等。
这些接口能够同时使用在同一个地方,按照不一样优先级顺序执行.用户能够任意基于协处理器实现复杂的HBase功能层。HBase有不少种事件能够触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会因为各类缘由有所改动,不一样版本的接口改动比较大,具体参考Java Doc。
RegionObserver工做原理,如图1所示。更多关于Observer细节请参见HBaseBook的第 9.6.3章节。
3.2终端(Endpoint)
终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而可以经过HBase RPC唤醒。客户端类库提供了很是方便的方法来调用这些动态接口,它们能够在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户能够结合使用这些强大的插件接口,为HBase添加全新的特性。终端的使用,以下面流程所示:
单个region:HTableInterface.coprocessorProxy(Class protocol, byte[] row) 。 rigons区域:HTableInterface.coprocessorExec(Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable)
总体的终端调用过程范例,所示:
4.编程实践(Code Example)
在该实例中,咱们经过计算HBase表中行数的一个实例,来真实感觉协处理器 的方便和强大。在旧版的HBase咱们须要编写MapReduce代码来汇总数据表中的行数,在0.92以上的版本HBase中,只须要编写客户端的代码便可实现,很是适合用在WebService的封装上。
4.1启用协处理器 Aggregation(Enable Coprocessor Aggregation)
咱们有两个方法:
1.启动全局aggregation,能过操纵全部的表上的数据。经过修改hbase-site.xml这个文件来实现,只须要添加以下代码:
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
2.启用表aggregation,只对特定的表生效。经过HBase Shell 来实现。
disable 'xyz' alter 'xyz','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint||' enable 'xyz'
package hbase.learn.com; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; public class RowCount { public static void main(String[] args) { String tableName="xyz"; // Get instance of Configuration Configuration configuration = HBaseConfiguration.create(); // Get table instance HTable table = new HTable(configuration, tableName); final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance(); Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService,Long>() { public Long call(ExampleProtos.RowCountService counter) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<ExampleProtos.CountResponse>(); counter.getRowCount(controller, request, rpcCallback); ExampleProtos.CountResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return (response != null && response.hasCount()) ? response.getCount() : 0; } }); long sum=0; int count=0; for(Long l:results.values()){ sum+=l; count++; } System.out.println("row count = "+sum); System.out.println("region count = "+count); } }
欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!