hbase源码系列(二)HTable 探秘

  hbase的源码终于搞一个段落了,在接下来的一个月,着重于把看过的源码提炼一下,对一些有意思的主题进行分享一下。继上一篇讲了负载均衡以后,这一篇咱们从client开始讲吧,从client到master再到region server,按照这个顺序来开展,网友也能够对本身感兴趣的部分给我留言或者直接联系个人QQ。html

  如今咱们讲一下HTable吧,为何讲HTable,由于这是咱们最多见的一个类,这是咱们对hbase中数据的操做的入口。正则表达式

  

1.Put操做

  下面是一个很简单往hbase插入一条记录的例子。缓存

HBaseConfiguration conf =  (HBaseConfiguration) HBaseConfiguration.create();
byte[] rowkey = Bytes.toBytes("cenyuhai");
byte[] family = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("name");
byte[] value = Bytes.toBytes("岑玉海");
        
HTable table = new HTable(conf, "test");
Put put = new Put(rowkey);
put.add(family,qualifier,value);
        
table.put(put);
View Code

  咱们日常就是采用这种方式提交的数据,为了提升重用性采用HTablePool,最新的API推荐使用HConnection.getTable("test")来得到HTable,旧的HTablePool已经被抛弃了。好,咱们下面开始看看HTable内部是如何实现的吧,首先咱们看看它内部有什么属性。多线程

  /** 实际提交数据所用的类 */
protected
HConnection connection;/** 须要提交的数据的列表 */ protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
/** flush的size */
private long writeBufferSize; /** 是否自动flush */ private boolean autoFlush; /** 当前的数据的size,达到指定的size就要提交 */ protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; private ExecutorService pool; // For Multi
/** 异步提交 */ protected AsyncProcess<Object> ap;
** rpc工厂 */
private RpcRetryingCallerFactory rpcCallerFactory;

  主要是靠上面的这些家伙来干活的,这里面的connection、ap、rpcCallerFactory是用来和后台通讯的,HTable只是作一个操做,数据进来以后,添加到writeAsyncBuffer,知足条件就flush。负载均衡

  下面看看table.put是怎么执行的:异步

    doPut(put);
    if (autoFlush) {
      flushCommits();
    }

  执行put操做,若是是autoFush,就提交,先看doPut的过程,若是以前的ap异步提交到有问题,就先进行后台提交,不过此次是同步的,若是没有错误,就把put添加到队列当中,而后检查一下当前的 buffer的大小,超过咱们设置的内容的时候,就flush掉。async

if (ap.hasError()){
      backgroundFlushCommits(true);
}
currentWriteBufferSize += put.heapSize();
writeAsyncBuffer.add(put);
while (currentWriteBufferSize > writeBufferSize) {
    backgroundFlushCommits(false);
}

  写下来,让咱们看看backgroundFlushCommits这个方法吧,它的核心就这么一句ap.submit(writeAsyncBuffer, true) ,若是出错了的话,就报错了。因此网上全部关于客户端调优的方法里面无非就这么几种:ide

1)关闭autoFlush优化

2)关闭wal日志ui

3)把writeBufferSize设大一点,通常说是设置成5MB

  通过实践,就第二条关闭日志的效果比较明显,其它的效果都不明显,由于提交的过程是异步的,因此提交的时候占用的时间并很少,提交到server端后,server还有一个写入的队列,(⊙o⊙)… 让人想起小米手机那恶心的排队了。。。因此大规模写入数据,别期望着用put来解决。。。mapreduce生成hfile,而后用bulk load的方式比较好。

  不废话了,咱们继续追踪ap.submit方法吧,F3进去。

  

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        //为row定位
        HRegionLocation loc = findDestLocation(r, 1, posInList);

        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
          // loc is null if there is an error such as meta not available.
          Action<Row> action = new Action<Row>(r, ++posInList);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer);
          it.remove();
        }
      }
View Code

  循环遍历r,为每一个r找到它的位置loc,loc是HRegionLocation,里面记录着这行记录所在的目标region所在的位置,loc怎么得到呢,走进findDestLocation方法里面,看到了这么一句。

  

loc = hConnection.locateRegion(this.tableName, row.getRow());

  经过表名和rowkey,使用HConnection就能够定位到它的位置,这里就先不讲定位了,稍后放一节出来说,请看这一篇《Client如何找到正确的Region Server》,不然篇幅太长了,这里咱们只须要记住,提交操做,是要知道它对应的region在哪里的。

  定位到它的位置以后,它把loc添加到了actionsByServer,一个region server对应一组操做。(插句题外话为何这里叫action呢,其实咱们熟知的Put、Delete,以及不经常使用的Append、Increment都是继承自Row的,在接口传递时候,其实都是视为一种操做,到了后台以后,才作区分)。

  接下来,就是多线程的rpc提交了。

MultiServerCallable<Row> callable = createCallable(loc, multiAction);
......
res = createCaller(callable).callWithoutRetries(callable);

  再深挖一点,把它们的实现都扒出来吧。

  protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
      final MultiAction<Row> multi) {
    return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
  }

  protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
    return rpcCallerFactory.<MultiResponse> newCaller();
  }

  ok,看到了,先构造一个MultiServerCallable,而后再经过rpcCallerFactory作最后的call操做。

  好了,到这里再总结一下put操做吧,前面写得有点儿凌乱了。

  (1)把put操做添加到writeAsyncBuffer队列里面,符合条件(自动flush或者超过了阀值writeBufferSize)就经过AsyncProcess异步批量提交。

  (2)在提交以前,咱们要根据每一个rowkey找到它们归属的region server,这个定位的过程是经过HConnection的locateRegion方法得到的,而后再把这些rowkey按照HRegionLocation分组。

  (3)经过多线程,一个HRegionLocation构造MultiServerCallable<Row>,而后经过rpcCallerFactory.<MultiResponse> newCaller()执行调用,忽略掉失败从新提交和错误处理,客户端的提交操做到此结束。

  

2.Delete操做

  对于Delete,咱们也能够经过如下代码执行一个delete操做

Delete del = new Delete(rowkey);
table.delete(del);

  这个操做比较干脆,new一个RegionServerCallable<Boolean>,直接走rpc了,爽快啊。

RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
        tableName, delete.getRow()) {
      public Boolean call() throws IOException {
        try {
          MutateRequest request = RequestConverter.buildMutateRequest(
            getLocation().getRegionInfo().getRegionName(), delete);
          MutateResponse response = getStub().mutate(null, request);
          return Boolean.valueOf(response.getProcessed());
        } catch (ServiceException se) {
          throw ProtobufUtil.getRemoteException(se);
        }
      }
    };
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
View Code

  这里面注意一下这行MutateResponse response = getStub().mutate(null, request);

  getStub()返回的是一个ClientService.BlockingInterface接口,实现这个接口的类是HRegionServer,这样子咱们就知道它在服务端执行了HRegionServer里面的mutate方法。

3.Get操做

  get操做也和delete同样简单

  

Get get = new Get(rowkey);
Result row = table.get(get);

  get操做也没几行代码,仍是直接走的rpc

public Result get(final Get get) throws IOException {
    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
View Code

  注意里面的ProtobufUtil.get操做,它实际上是构建了一个GetRequest,须要的参数是regionName和get,而后走HRegionServer的get方法,返回一个GetResponse

public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
}
View Code

 

 4.批量操做

  

  针对put、delete、get都有相应的操做的方式:

  1.Put(list)操做,不少童鞋觉得这个能够提升写入速度,其实无效。。。为啥?由于你构造了一个list进去,它再遍历一下list,执行doPut操做。。。。反而还慢点。

  2.delete和get的批量操做走的都是connection.processBatchCallback(actions, tableName, pool, results, callback),具体的实如今HConnectionManager的静态类HConnectionImplementation里面,结果咱们惊人的发现:

AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();

  它走的仍是put同样的操做,既然是同样的,何苦代码写得那么绕呢?

5.查询操做

  如今讲一下scan吧,这个操做相对复杂点。仍是老规矩,先上一下代码吧。

        Scan scan = new Scan();
        //scan.setTimeRange(new Date("20140101").getTime(), new Date("20140429").getTime());
        scan.setBatch(10);
        scan.setCaching(10);
        scan.setStartRow(Bytes.toBytes("cenyuhai-00000-20140101"));
        scan.setStopRow(Bytes.toBytes("cenyuhai-zzzzz-201400429"));
        //若是设置为READ_COMMITTED,它会取当前的时间做为读的检查点,在这个时间点以后的就排除掉了
        scan.setIsolationLevel(IsolationLevel.READ_COMMITTED);
        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("pattern"));
        ResultScanner resultScanner = table.getScanner(scan);
        Result result = null;
        while ((result = resultScanner.next()) != null) {
            //本身处理去吧...
        }

 

  这个是带正则表达式的模糊查询的scan查询,Scan这个类是包括咱们查询全部须要的参数,batch和caching的设置,在个人另一篇文章里面有写《hbase客户端设置缓存优化查询》

Scan查询的时候,设置StartRow和StopRow但是重头戏,假设我这里要查我01月01日到04月29日总共发了多少业务,中间是业务类型,可是我多是全部的都查,或者只查一部分,在全部都查的状况下,我就不能设置了,那可是StartRow和StopRow我不能空着啊,因此这里能够填00000-zzzzz,只要保证它在这个区间就能够了,而后咱们加了一个RowFilter,而后引入了正则表达式,以前好多人一直在问啊问的,不过我这个例子,其实不要也能够,由于是查全部业务的,在StartRow和StopRow之间的均可以要。

  好的,咱们接着看,F3进入getScanner方法

if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
}
return new ClientScanner(getConfiguration(), scan, getName(), this.connection);

  这个scan还分大小, 不要紧,咱们进入ClientScanner看一下吧, 在ClientScanner的构造方法里面发现它会去调用nextScanner去初始化一个ScannerCallable。好的,咱们接着来到ScannerCallable里面,这里须要注意的是它的两个方法,prepare和call方法。在prepare里面它主要干了两个事情,得到region的HRegionLocation和ClientService.BlockingInterface接口的实例,以前说过这个继承这个接口的只有Region Server的实现类。

  public void prepare(final boolean reload) throws IOException {
    this.location = connection.getRegionLocation(tableName, row, reload);    //HConnection.getClient()这个方法简直就是神器啊
    setStub(getConnection().getClient(getLocation().getServerName()));
  }

  ok,咱们下面看看call方法吧

  public Result [] call() throws IOException {
     // 第一次走的地方,开启scanner
      if (scannerId == -1L) {
        this.scannerId = openScanner();
      } else {
        Result [] rrs = null;
        ScanRequest request = null;
        try {
          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
          ScanResponse response = null;       
      // 准备用controller去携带返回的数据,这样的话就不用进行protobuf的序列化了
      PayloadCarryingRpcController controller
= new PayloadCarryingRpcController();
      controller.setPriority(getTableName()); response
= getStub().scan(controller, request); nextCallSeq++; long timestamp = System.currentTimeMillis(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response);     } catch (IOException e) {              
}     
    }
    return rrs; } return null; }

 

   在call方法里面,咱们能够看得出来,实例化ScanRequest,而后调用scan方法的时候把PayloadCarryingRpcController传过去,这里跟踪了一下,若是设置了codec的就从PayloadCarryingRpcController里面返回结果,不然从response里面返回。

  好的,下面看next方法吧。

    @Override
    public Result next() throws IOException { if (cache.size() == 0) {
        Result [] values = null;
        long remainingResultSize = maxScannerResultSize;
        int countdown = this.caching;      
     // 设置获取数据的条数
     callable.setCaching(this.caching); boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do {       if (skipFirst) {          // 上次读的最后一个,此次就不读了,直接跳过就是了 callable.setCaching(1); values = this.caller.callWithRetries(callable); callable.setCaching(this.caching); skipFirst = false; }
       values = this.caller.callWithRetries(callable);    if (values != null && values.length > 0) { for (Result rs : values) {          //缓存起来 cache.add(rs); for (Cell kv : rs.rawCells()) {//计算出keyvalue的大小,而后减去 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); } countdown--; this.lastResult = rs; } } // Values == null means server-side filter has determined we must STOP } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));  
     //缓存里面有就从缓存里面取
     if (cache.size() > 0) { return cache.poll(); }      return null; }

  从next方法里面能够看出来,它是一次取caching条数据,而后下一次获取的时候,先把上次获取的最后一个给排除掉,再获取下来保存在cache当中,只要缓存不空,就一直在缓存里面取。

  好了,至此Scan到此结束。

相关文章
相关标签/搜索