hbase自定义过滤器

1. 下载protobuf-2.5.0解压,若是是window下,额外下载protoc-2.5.0-win32,解压,将protoc.exe放在protobuf-2.5.0下的src目录下html

2. 配置环境变量,添加path路径指向protobuf目录的src中java

3. 查看当前版本,在命令提示符中输入命令express

4. 建立一个空白的文本文件 命名为 CustomNumberComparator.proto  即 后缀文件类型为protoapache

 

5. 用记事本打开CustomNumberComparator.proto文件输入如下内容编程

 

 

 
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// This file contains protocol buffers that are used for filters

option java_package = "com.pateo.hbase.defined.comparator";//生成java代码的包名
option java_outer_classname = "MyComparatorProtos";//生成的类名
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

// This file contains protocol buffers that are used for comparators (e.g. in filters)

message CustomNumberComparator {
    required bytes value = 1;     //自定义比较器中需序列化的字段
    required string fieldType = 2;//自定义比较器中需序列化的字段
}

 

 

6. 进入命令提示符,使用命令读取CustomNumberComparator.proto的内容生成java代码,即自定义比较器的序列化类数组

  内容:   protoc.exe -I=C:/proto --java_out=C:/proto C:/proto/CustomNumberComparator.protoapp

 

输入后会在指定的/protoc中生成一个文件夹less

获得自定义比较器的序列化类ide

7. 将生成的文件夹拷贝到idea编程工具中,注意粘贴的路径为java下工具

8. 新建一个自定义过滤器类CustomNumberComparator

9. CustomNumberComparator继承ByteArrayComparable类,重写方法,代码以下

 

package com.pateo.hbase.defined.comparator;

import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.util.Locale;

/**
 * 自定义比较器:使用方法见 CompareTest
 *
 * @param : fieldType 传递数据格式的类型,支持的数据类型:double
 * @param : data 经过Bytes转换获得的字节数组 使用注意事项 : 使用的时候要注意数据类型的匹配问题
 */
public class CustomNumberComparator extends ByteArrayComparable {
    /**
     * 目前只支持 double类型
     */
    private String fieldType;
    private byte[] data;

    /**
     * Constructor
     *
     * @param value
     * @param fieldType
     */
    public CustomNumberComparator(byte[] value, String fieldType) {
        super(value);
        this.fieldType = fieldType;
        this.fieldType = "String";//只支持
        this.data = value;
    }

    @Override
    // 重写该方法
    public byte[] toByteArray() {

        MyComparatorProtos.CustomNumberComparator.Builder builder = MyComparatorProtos.CustomNumberComparator
                .newBuilder();
        builder.setValue(ByteString.copyFrom(this.data));
        builder.setFieldType(this.fieldType);
        return builder.build().toByteArray();
    }

    // 定义该方法,用于对象反序列化操做
    public static CustomNumberComparator parseFrom(final byte[] bytes)
            throws DeserializationException {
        MyComparatorProtos.CustomNumberComparator proto = null;
        try {
            proto = MyComparatorProtos.CustomNumberComparator.parseFrom(bytes);
        } catch (InvalidProtocolBufferException e) {
            throw new DeserializationException(e);
        }
        return new CustomNumberComparator(proto.getValue().toByteArray(),
                proto.getFieldType());
    }

    // 重写比较方法 里面就能够按照本身的意愿来实现本身的比较器
    @Override
    public int compareTo(byte[] bytes, int offset, int length) {

        if (fieldType.equalsIgnoreCase("String")) {
            String Rowkey = Bytes.toString(bytes, offset, length).toLowerCase(Locale.ROOT);//获得rowkey
            String substring1 = Rowkey.substring(1, 5);
            String substring2 = Rowkey.substring(17, 22);
            String paramValue = byteConvertObj(String.class, this.data);
            String[] split = paramValue.split(",");
            if (substring1.contains(split[0]) && substring2.contains(split[1])) {
                return 0;
            } else {
                return 1;
            }
        }
        return 1;
    }

    private <T> T byteConvertObj(Class<T> clazz, byte[] data) { String clazzName = clazz.getSimpleName(); if (clazzName.equalsIgnoreCase("Integer")) { Integer paramValue; try { paramValue = Bytes.toInt(data); } catch (IllegalArgumentException e) { paramValue = Integer.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Long")) { Long paramValue; try { paramValue = Bytes.toLong(data); } catch (IllegalArgumentException e) { paramValue = Long.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Float")) { Float paramValue; try { paramValue = Bytes.toFloat(data); } catch (IllegalArgumentException e) { paramValue = Float.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Double")) { Double paramValue; try { paramValue = Bytes.toDouble(data); } catch (IllegalArgumentException e) { paramValue = Double.valueOf(Bytes.toString(data)); } return (T) paramValue; } else if (clazzName.equalsIgnoreCase("Short")) { Short paramValue; try { paramValue = Bytes.toShort(data); } catch (IllegalArgumentException e) { paramValue = Short.valueOf(Bytes.toString(data)); } return (T) paramValue; } return (T) Bytes.toString(data); } }

10.核心内容为compareTo方法的内容,即为过滤的逻辑实现

    @Override
    public int compareTo(byte[] bytes, int offset, int length) {

        if (fieldType.equalsIgnoreCase("String")) {
            //HbaseValue是在Hbase上搜索到的一条数据
            String HbaseValue = Bytes.toString(bytes, offset, length).toLowerCase(Locale.ROOT);
            String substring1 = HbaseValue.substring(1, 5);
            String substring2 = HbaseValue.substring(17, 22);
            String ClientValue = byteConvertObj(String.class, this.data);//客户端传入的过滤内容
            String[] split = ClientValue.split(",");
            
            if (substring1.contains(split[0]) && substring2.contains(split[1])) {//是否须要过滤
                return 0;//选择
            } else {
                return 1;//过滤
            }
        }
        return 1;//过滤
    }

11.将这个项目打成jar架包放入hbase根目录中的lib下

 

 

 

选中以后自动打包成jar

将这个架包发送到hbase的lib目录中,重启hbase

12. 使用自定义类查询结果

代码以下

 

/**
 * 过滤器实现了类介绍
 * 行键过滤 RowFilter
 * 列簇名过滤 FamilyFilter
 * 列过滤 QualifierFilter
 * 值过滤 ValueFilter
 */

public static void main(String[] args) throws IOException {
    long time1 = new Date().getTime();
    //分页查询
    String page = getPage("00003", 2);//分页查询传入起始位置与返回数量
    System.out.println("下一个起始页码" + page);
    long time2 = new Date().getTime();
    long l = time2 - time1;
    System.out.println("[ "+l/1000+" s ]");
}

/**
 *
 * @param lastRowkey  起始行键
 * @param page 页码
 * @return 下一次查询的起始行键
 * @throws IOException
 */

public static String getPage(String lastRowkey, Integer page) throws IOException {
    //配置参数
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "mini1");
    conf.set("hbase.zookeeper.property.clientPort", "2181");
    conf.set("hbase.master", "mini1:6000");

    //建立查询类
    Scan scan = new Scan();
    //定义过滤器
    Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator("999".getBytes()));//前缀查询
    Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new CustomNumberComparator("99,034".getBytes(),"String"));//包含查询
    //过滤器组合
    FilterList filterlist = new FilterList();
    //filterlist.addFilter(filter1);
    //filterlist.addFilter(filter1);
    filterlist.addFilter(filter);

    //组合查询
    scan.setFilter(filterlist);
    //scan.setStartRow(lastRowkey.getBytes());//分页查询使用的key为下一个其实位置,须要添加0
    long time1 = new Date().getTime();
    HTable table = new HTable(conf, "table");
    long time2 = new Date().getTime();
    System.out.println("建立 HTable:"+(time2-time1)/1000.0+"s");
    ResultScanner scanner = table.getScanner(scan);//获取全部结果集
    long time3 = new Date().getTime();
    System.out.println("获取ResultScanner :"+(time3-time1)/1000.0+"s");
    String format = "  %-40s%-14s%-35s%-10s";//输出格式
    System.out.println(String.format("%-40s%-14s", "ROW", "COLUMN+CELL"));//格式化输出
    Long index = 0L;
    for (Result res : scanner) {//获取一行数据
        index++;
        for (Cell cell : res.listCells()) {//获取各个列的值
            String row = Bytes.toString(CellUtil.cloneRow(cell));//行键
            String value = "value=" + Bytes.toString(CellUtil.cloneValue(cell));//值
            String family = Bytes.toString(CellUtil.cloneFamily(cell));//列簇
            String col = Bytes.toString(CellUtil.cloneQualifier(cell));//列名
            String column = "column=" + family + ":" + col;//列簇与列
            String timestamp = "timestamp=" + cell.getTimestamp();//时间戳
            System.out.println(String.format(format, row, column, timestamp, value));//格式化输出
        }
        //lastRowkey = Bytes.toString(res.getRow()) + "0";//给下一次查询起始rowkey位置赋值
    }
    long time4 = new Date().getTime();
    System.out.println(String.format("一共 %d 条数据",index));
    System.out.println("输出耗时 :"+(time4-time3)/1000.0+"s");
    //防止程序异常,这里须要try-catch关闭链接
    scanner.close();
    table.close();

    //返回下一次查询的起始行键,用于翻页

    return lastRowkey;
}

 

  hbase经过网关访问即thrift2      <hbase_home>/bin/hbase thrift2/thrift start    启动

相关文章
相关标签/搜索