在使用 HBase 时,若是你的数据量达到了数十亿行或数百万列,此时可否在查询中返回大量数据将受制于网络的带宽,即使网络情况容许,可是客户端的计算处理也未必可以知足要求。在这种状况下,协处理器(Coprocessors)应运而生。它容许你将业务计算代码放入在 RegionServer 的协处理器中,将处理好的数据再返回给客户端,这能够极大地下降须要传输的数据量,从而得到性能上的提高。同时协处理器也容许用户扩展实现 HBase 目前所不具有的功能,如权限校验、二级索引、完整性约束等。html
Observer 协处理器相似于关系型数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。一般能够用来实现下面功能:java
Get
或 Put
操做以前,您可使用 preGet
或 prePut
方法检查权限;当前 Observer 协处理器有如下四种类型:node
以上四种类型的 Observer 协处理器均继承自 Coprocessor
接口,这四个接口中分别定义了全部可用的钩子方法,以便在对应方法先后执行特定的操做。一般状况下,咱们并不会直接实现上面接口,而是继承其 Base 实现类,Base 实现类只是简单空实现了接口中的方法,这样咱们在实现自定义的协处理器时,就没必要实现全部方法,只须要重写必要方法便可。git
这里以 RegionObservers
为例,其接口类中定义了全部可用的钩子方法,下面截取了部分方法的定义,多数方法都是成对出现的,有 pre
就有 post
:github
prePut()
拦截,该请求继续送到 region,而后进行处理postPut()
postPut()
拦截该响应,最终结果被返回给客户端若是你们了解 Spring,能够将这种执行方式类比于其 AOP 的执行原理便可,官方文档当中也是这样类比的:redis
If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor as applying advice by intercepting a request and then running some custom code,before passing the request on to its final destination (or even changing the destination).shell
若是您熟悉面向切面编程(AOP),您能够将协处理器视为经过拦截请求而后运行一些自定义代码来使用 Advice,而后将请求传递到其最终目标(或者更改目标)。数据库
Endpoint 协处理器相似于关系型数据库中的存储过程。客户端能够调用 Endpoint 协处理器在服务端对数据进行处理,而后再返回。apache
以汇集操做为例,若是没有协处理器,当用户须要找出一张表中的最大数据,即 max 聚合操做,就必须进行全表扫描,而后在客户端上遍历扫描结果,这必然会加剧了客户端处理数据的压力。利用 Coprocessor,用户能够将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操做。即在每一个 Region 范围内执行求最大值的代码,将每一个 Region 的最大值在 Region Server 端计算出来,仅仅将该 max 值返回给客户端。以后客户端只须要将每一个 Region 的最大值进行比较而找到其中最大的值便可。编程
要使用咱们本身开发的协处理器,必须经过静态(使用 HBase 配置)或动态(使用 HBase Shell 或 Java API)加载它。
其加载和卸载方式分别介绍以下。
静态加载分如下三步:
hbase-site.xml
定义须要加载的协处理器。<property> <name>hbase.coprocessor.region.classes</name> <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value> </property>
<name>
标签的值必须是下面其中之一:
hbase.coprocessor.region.classes
hbase.coprocessor.wal.classes
hbase.coprocessor.master.classes
<value>
必须是协处理器实现类的全限定类名。若是为加载指定了多个类,则类名必须以逗号分隔。
将 jar(包含代码和全部依赖项) 放入 HBase 安装目录中的 lib
目录下;
重启 HBase。
从 hbase-site.xml 中删除配置的协处理器的<property>元素及其子元素;
从类路径或 HBase 的 lib 目录中删除协处理器的 JAR 文件(可选);
重启 HBase。
使用动态加载协处理器,不须要从新启动 HBase。但动态加载的协处理器是基于每一个表加载的,只能用于所指定的表。
此外,在使用动态加载必须使表脱机(disable)以加载协处理器。动态加载一般有两种方式:Shell 和 Java API 。
如下示例基于两个前提:
- coprocessor.jar 包含协处理器实现及其全部依赖项。
- JAR 包存放在 HDFS 上的路径为:hdfs:// <namenode>:<port> / user / <hadoop-user> /coprocessor.jar
hbase > disable 'tableName'
hbase > alter 'tableName', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/ user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| arg1=1,arg2=2'
Coprocessor
包含由管道(|)字符分隔的四个参数,按顺序解释以下:
容许使用通配符,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar
来添加指定的 JAR 包;
可使指定目录,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/
,这会添加目录中的全部 JAR 包,但不会搜索子目录中的 JAR 包。
可选参数 :传递的协处理器的可选参数。
hbase > enable 'tableName'
hbase > describe 'tableName'
协处理器出如今 TABLE_ATTRIBUTES
属性中则表明加载成功。
hbase> disable 'tableName'
hbase> alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
hbase> enable 'tableName'
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.setValue("COPROCESSOR$1", path + "|" + RegionObserverExample.class.getCanonicalName() + "|" + Coprocessor.PRIORITY_USER); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
在 HBase 0.96 及其之后版本中,HTableDescriptor 的 addCoprocessor() 方法提供了一种更为简便的加载方法。
TableName tableName = TableName.valueOf("users"); Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"); Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
卸载其实就是从新定义表但不设置协处理器。这会删除全部表上的协处理器。
TableName tableName = TableName.valueOf("users"); String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); columnFamily1.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily1); HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); columnFamily2.setMaxVersions(3); hTableDescriptor.addFamily(columnFamily2); admin.modifyTable(tableName, hTableDescriptor); admin.enableTable(tableName);
这里给出一个简单的案例,实现一个相似于 Redis 中 append
命令的协处理器,当咱们对已有列执行 put 操做时候,HBase 默认执行的是 update 操做,这里咱们修改成执行 append 操做。
# redis append 命令示例 redis> EXISTS mykey (integer) 0 redis> APPEND mykey "Hello" (integer) 5 redis> APPEND mykey " World" (integer) 11 redis> GET mykey "Hello World"
# 建立一张杂志表 有文章和图片两个列族 hbase > create 'magazine','article','picture'
完整代码可见本仓库:hbase-observer-coprocessor
新建 Maven 工程,导入下面依赖:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0</version> </dependency>
继承 BaseRegionObserver
实现咱们自定义的 RegionObserver
,对相同的 article:content
执行 put 命令时,将新插入的内容添加到原有内容的末尾,代码以下:
public class AppendRegionObserver extends BaseRegionObserver { private byte[] columnFamily = Bytes.toBytes("article"); private byte[] qualifier = Bytes.toBytes("content"); @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { if (put.has(columnFamily, qualifier)) { // 遍历查询结果,获取指定列的原值 Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow())); String oldValue = ""; for (Cell cell : rs.rawCells()) if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) { oldValue = Bytes.toString(CellUtil.cloneValue(cell)); } // 获取指定列新插入的值 List<Cell> cells = put.get(columnFamily, qualifier); String newValue = ""; for (Cell cell : cells) { if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) { newValue = Bytes.toString(CellUtil.cloneValue(cell)); } } // Append 操做 put.addColumn(columnFamily, qualifier, Bytes.toBytes(oldValue + newValue)); } } }
使用 maven 命令进行打包,打包后的文件名为 hbase-observer-coprocessor-1.0-SNAPSHOT.jar
# mvn clean package
# 上传项目到HDFS上的hbase目录 hadoop fs -put /usr/app/hbase-observer-coprocessor-1.0-SNAPSHOT.jar /hbase # 查看上传是否成功 hadoop fs -ls /hbase
hbase > disable 'magazine'
hbase > alter 'magazine', METHOD => 'table_att', 'Coprocessor'=>'hdfs://hadoop001:8020/hbase/hbase-observer-coprocessor-1.0-SNAPSHOT.jar|com.heibaiying.AppendRegionObserver|1001|'
hbase > enable 'magazine'
hbase > desc 'magazine'
协处理器出如今 TABLE_ATTRIBUTES
属性中则表明加载成功,以下图:
插入一组测试数据:
hbase > put 'magazine', 'rowkey1','article:content','Hello' hbase > get 'magazine','rowkey1','article:content' hbase > put 'magazine', 'rowkey1','article:content','World' hbase > get 'magazine','rowkey1','article:content'
能够看到对于指定列的值已经执行了 append 操做:
插入一组对照数据:
hbase > put 'magazine', 'rowkey1','article:author','zhangsan' hbase > get 'magazine','rowkey1','article:author' hbase > put 'magazine', 'rowkey1','article:author','lisi' hbase > get 'magazine','rowkey1','article:author'
能够看到对于正常的列仍是执行 update 操做:
hbase > disable 'magazine'
hbase > alter 'magazine', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
hbase > enable 'magazine'
hbase > desc 'magazine'
依次执行下面命令能够测试卸载是否成功
hbase > get 'magazine','rowkey1','article:content' hbase > put 'magazine', 'rowkey1','article:content','Hello' hbase > get 'magazine','rowkey1','article:content'
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南