Hbase 提供了种类丰富的过滤器(filter)来提升数据处理的效率,用户能够经过内置或自定义的过滤器来对数据进行过滤,全部的过滤器都在服务端生效,即谓词下推(predicate push down)。这样能够保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。html
Filter 接口中定义了过滤器的基本方法,FilterBase 抽象类实现了 Filter 接口。全部内置的过滤器则直接或者间接继承自 FilterBase 抽象类。用户只须要将定义好的过滤器经过 setFilter
方法传递给 Scan
或 put
的实例便可。java
setFilter(Filter filter)
// Scan 中定义的 setFilter @Override public Scan setFilter(Filter filter) { super.setFilter(filter); return this; }
// Get 中定义的 setFilter @Override public Get setFilter(Filter filter) { super.setFilter(filter); return this; }
FilterBase 的全部子类过滤器以下:git
说明:上图基于当前时间点(2019.4)最新的 Hbase-2.1.4 ,下文全部说明均基于此版本。github
HBase 内置过滤器能够分为三类:分别是比较过滤器,专用过滤器和包装过滤器。分别在下面的三个小节中作详细的介绍。正则表达式
全部比较过滤器均继承自 CompareFilter
。建立一个比较过滤器须要两个参数,分别是比较运算符和比较器实例。shell
public CompareFilter(final CompareOp compareOp,final ByteArrayComparable comparator) { this.compareOp = compareOp; this.comparator = comparator; }
比较运算符均定义在枚举类 CompareOperator
中数据库
@InterfaceAudience.Public public enum CompareOperator { LESS, LESS_OR_EQUAL, EQUAL, NOT_EQUAL, GREATER_OR_EQUAL, GREATER, NO_OP, }
注意:在 1.x 版本的 HBase 中,比较运算符定义在
CompareFilter.CompareOp
枚举类中,但在 2.0 以后这个类就被标识为 @deprecated ,并会在 3.0 移除。因此 2.0 以后版本的 HBase 须要使用CompareOperator
这个枚举类。数组
全部比较器均继承自 ByteArrayComparable
抽象类,经常使用的有如下几种:网络
Bytes.compareTo(byte [],byte [])
按字典序比较指定的字节数组。EQUAL
和 NOT_EQUAL
操做。EQUAL
和 NOT_EQUAL
操做。BinaryPrefixComparator
和 BinaryComparator
的区别不是很好理解,这里举例说明一下:ide
在进行 EQUAL
的比较时,若是比较器传入的是 abcd
的字节数组,可是待比较数据是 abcdefgh
:
BinaryPrefixComparator
比较器,则比较以 abcd
字节数组的长度为准,即 efgh
不会参与比较,这时候认为 abcd
与 abcdefgh
是知足 EQUAL
条件的;BinaryComparator
比较器,则认为其是不相等的。比较过滤器共有五个(Hbase 1.x 版本和 2.x 版本相同),见下图:
前四种过滤器的使用方法相同,均只要传递比较运算符和运算器实例便可构建,而后经过 setFilter
方法传递给 scan
:
Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("xxx"))); scan.setFilter(filter);
DependentColumnFilter
的使用稍微复杂一点,这里单独作下说明。
能够把 DependentColumnFilter
理解为一个 valueFilter 和一个时间戳过滤器的组合。DependentColumnFilter
有三个带参构造器,这里选择一个参数最全的进行说明:
DependentColumnFilter(final byte [] family, final byte[] qualifier, final boolean dropDependentColumn, final CompareOperator op, final ByteArrayComparable valueComparator)
这里举例进行说明:
DependentColumnFilter dependentColumnFilter = new DependentColumnFilter( Bytes.toBytes("student"), Bytes.toBytes("name"), false, CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("xiaolan")));
首先会去查找 student:name
中值以 xiaolan
开头的全部数据得到 参考数据集
,这一步等同于 valueFilter 过滤器;
其次再用参考数据集中全部数据的时间戳去检索其余列,得到时间戳相同的其余列的数据做为 结果数据集
,这一步等同于时间戳过滤器;
最后若是 dropDependentColumn
为 true,则返回 参考数据集
+结果数据集
,若为 false,则抛弃参考数据集,只返回 结果数据集
。
专用过滤器一般直接继承自 FilterBase
,适用于范围更小的筛选规则。
基于某列(参考列)的值决定某行数据是否被过滤。其实例有如下方法:
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter( "student".getBytes(), "name".getBytes(), CompareOperator.EQUAL, new SubstringComparator("xiaolan")); singleColumnValueFilter.setFilterIfMissing(true); scan.setFilter(singleColumnValueFilter);
SingleColumnValueExcludeFilter
继承自上面的 SingleColumnValueFilter
,过滤行为与其相反。
基于 RowKey 值决定某行数据是否被过滤。
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("xxx")); scan.setFilter(prefixFilter);
基于列限定符(列名)决定某行数据是否被过滤。
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes("xxx")); scan.setFilter(columnPrefixFilter);
可使用这个过滤器实现对结果按行进行分页,建立 PageFilter 实例的时候须要传入每页的行数。
public PageFilter(final long pageSize) { Preconditions.checkArgument(pageSize >= 0, "must be positive %s", pageSize); this.pageSize = pageSize; }
下面的代码体现了客户端实现分页查询的主要逻辑,这里对其进行一下解释说明:
客户端进行分页查询,须要传递 startRow
(起始 RowKey),知道起始 startRow
后,就能够返回对应的 pageSize 行数据。这里惟一的问题就是,对于第一次查询,显然 startRow
就是表格的第一行数据,可是以后第二次、第三次查询咱们并不知道 startRow
,只能知道上一次查询的最后一条数据的 RowKey(简单称之为 lastRow
)。
咱们不能将 lastRow
做为新一次查询的 startRow
传入,由于 scan 的查询区间是[startRow,endRow) ,即前开后闭区间,这样 startRow
在新的查询也会被返回,这条数据就重复了。
同时在不使用第三方数据库存储 RowKey 的状况下,咱们是没法经过知道 lastRow
的下一个 RowKey 的,由于 RowKey 的设计多是连续的也有多是不连续的。
因为 Hbase 的 RowKey 是按照字典序进行排序的。这种状况下,就能够在 lastRow
后面加上 0
,做为 startRow
传入,由于按照字典序的规则,某个值加上 0
后的新值,在字典序上必定是这个值的下一个值,对于 HBase 来讲下一个 RowKey 在字典序上必定也是等于或者大于这个新值的。
因此最后传入 lastRow
+0
,若是等于这个值的 RowKey 存在就从这个值开始 scan,不然从字典序的下一个 RowKey 开始 scan。
25 个字母以及数字字符,字典排序以下:
'0' < '1' < '2' < ... < '9' < 'a' < 'b' < ... < 'z'
分页查询主要实现逻辑:
byte[] POSTFIX = new byte[] { 0x00 }; Filter filter = new PageFilter(15); int totalRows = 0; byte[] lastRow = null; while (true) { Scan scan = new Scan(); scan.setFilter(filter); if (lastRow != null) { // 若是不是首行 则 lastRow + 0 byte[] startRow = Bytes.add(lastRow, POSTFIX); System.out.println("start row: " + Bytes.toStringBinary(startRow)); scan.withStartRow(startRow); } ResultScanner scanner = table.getScanner(scan); int localRows = 0; Result result; while ((result = scanner.next()) != null) { System.out.println(localRows++ + ": " + result); totalRows++; lastRow = result.getRow(); } scanner.close(); //最后一页,查询结束 if (localRows == 0) break; } System.out.println("total rows: " + totalRows);
须要注意的是在多台 Regin Services 上执行分页过滤的时候,因为并行执行的过滤器不能共享它们的状态和边界,因此有可能每一个过滤器都会在完成扫描前获取了 PageCount 行的结果,这种状况下会返回比分页条数更多的数据,分页过滤器就有失效的可能。
List<Long> list = new ArrayList<>(); list.add(1554975573000L); TimestampsFilter timestampsFilter = new TimestampsFilter(list); scan.setFilter(timestampsFilter);
FirstKeyOnlyFilter
只扫描每行的第一列,扫描完第一列后就结束对当前行的扫描,并跳转到下一行。相比于全表扫描,其性能更好,一般用于行数统计的场景,由于若是某一行存在,则行中必然至少有一列。
FirstKeyOnlyFilter firstKeyOnlyFilter = new FirstKeyOnlyFilter(); scan.set(firstKeyOnlyFilter);
包装过滤器就是经过包装其余过滤器以实现某些拓展的功能。
SkipFilter
包装一个过滤器,当被包装的过滤器遇到一个须要过滤的 KeyValue 实例时,则拓展过滤整行数据。下面是一个使用示例:
// 定义 ValueFilter 过滤器 Filter filter1 = new ValueFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("xxx"))); // 使用 SkipFilter 进行包装 Filter filter2 = new SkipFilter(filter1);
WhileMatchFilter
包装一个过滤器,当被包装的过滤器遇到一个须要过滤的 KeyValue 实例时,WhileMatchFilter
则结束本次扫描,返回已经扫描到的结果。下面是其使用示例:
Filter filter1 = new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("rowKey4"))); Scan scan = new Scan(); scan.setFilter(filter1); ResultScanner scanner1 = table.getScanner(scan); for (Result result : scanner1) { for (Cell cell : result.listCells()) { System.out.println(cell); } } scanner1.close(); System.out.println("--------------------"); // 使用 WhileMatchFilter 进行包装 Filter filter2 = new WhileMatchFilter(filter1); scan.setFilter(filter2); ResultScanner scanner2 = table.getScanner(scan); for (Result result : scanner1) { for (Cell cell : result.listCells()) { System.out.println(cell); } } scanner2.close();
rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0 rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0 rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0 rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0 rowKey5/student:name/1555035007051/Put/vlen=8/seqid=0 rowKey6/student:name/1555035007057/Put/vlen=8/seqid=0 rowKey7/student:name/1555035007062/Put/vlen=8/seqid=0 rowKey8/student:name/1555035007068/Put/vlen=8/seqid=0 rowKey9/student:name/1555035007073/Put/vlen=8/seqid=0 -------------------- rowKey0/student:name/1555035006994/Put/vlen=8/seqid=0 rowKey1/student:name/1555035007019/Put/vlen=8/seqid=0 rowKey2/student:name/1555035007025/Put/vlen=8/seqid=0 rowKey3/student:name/1555035007037/Put/vlen=8/seqid=0
能够看到被包装后,只返回了 rowKey4
以前的数据。
以上都是讲解单个过滤器的做用,当须要多个过滤器共同做用于一次查询的时候,就须要使用 FilterList
。FilterList
支持经过构造器或者 addFilter
方法传入多个过滤器。
// 构造器传入 public FilterList(final Operator operator, final List<Filter> filters) public FilterList(final List<Filter> filters) public FilterList(final Filter... filters) // 方法传入 public void addFilter(List<Filter> filters) public void addFilter(Filter filter)
多个过滤器组合的结果由 operator
参数定义 ,其可选参数定义在 Operator
枚举类中。只有 MUST_PASS_ALL
和 MUST_PASS_ONE
两个可选的值:
@InterfaceAudience.Public public enum Operator { /** !AND */ MUST_PASS_ALL, /** !OR */ MUST_PASS_ONE }
使用示例以下:
List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new RowFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("XXX"))); filters.add(filter1); Filter filter2 = new RowFilter(CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("YYY"))); filters.add(filter2); Filter filter3 = new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator("ZZZ")); filters.add(filter3); FilterList filterList = new FilterList(filters); Scan scan = new Scan(); scan.setFilter(filterList);
HBase: The Definitive Guide _> Chapter 4. Client API: Advanced Features
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南