这2周主要将项目中写hbase的模块中原来的异步hbaseclient改写成了使用hbase原生的HTable对象。大概总结下改写过程当中和hj,xingchao发现的问题和解决方法。java
1.HTablePool的基本使用方式:数据库
因为HTable对象不是线程安全的,所以HBase提供HTablePool来支持多线程写入hbase,多线程同时从HTablePool 中取出HTable并写入是安全的。HTablePool的使用方法相似数据库链接,使用时从HTablePool中取出一个HTable,使用完后再 close放回HTablePool中。安全
Put put = new Put(rowkey); put.add(LOG_COLUMN_FAMILY,HOST_NAME_QUALIFIER,values[0]); HTableInterface table = HBaseClientFactory.getHTableByName(RAW_LOG_TABLE); try { table.put(put); } catch (IOException e) { throw new RuntimeException("Put Log meet exception",e); }finally {
HBaseClientUtil.closeHTable(table); }
2.HTablePool的maxsize。多线程
HTablePool有一个maxsize,HTablePool针对每一个表都有一个Pool,maxsize表示这个Pool的最大大小,在使用HTablePool的过程当中咱们发现这个值仍是有须要注意的地方。异步
在多线程使用HTablePool拿到同一个表的HTable时,若是线程个数大于maxsize会致使写入始终是autoflush!ide
public HTableInterface getTable(String tableName) { // call the old getTable implementation renamed to findOrCreateTable HTableInterface table = findOrCreateTable(tableName); // return a proxy table so when user closes the proxy, the actual table // will be returned to the pool return new PooledHTable(table); }
当拿到HTable时会建立一个HTable对象并包装成一个PooledHTable对象。Pooled作了什么纳,其余方法都没变,只是在close时有所不一样:this
public void close() throws IOException { returnTable(table); }
private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); if (tables.size(tableName) >= maxSize) { // release table instance since we're not reusing it this.tables.remove(tableName, table); this.tableFactory.releaseHTableInterface(table); return; } tables.put(tableName, table); }
能够看到若是tables.size大于maxsize,此时会去掉一个保存的HTable对象,而releaseHTableInterface实际调 用的就是HTable的close方法,close方法又会强制flushHTable的buffer,所以,若是咱们想不使用autoflush提高写 入速度失效。
线程
3.HTablePool type。code
HTablePool提供了几种方式:ReusablePool,RoundRobinPool,ThreadLocalPool。默认的是 reusable,因为2的缘由,咱们也能够考虑使用ThreadLocal的Pool,这样多线程写入时分别取本身线程的Pool,这样互不影响,写入 的效率也会比较高。对象
static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> { private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>(); public ThreadLocalPool() { } @Override public R put(R resource) { R previousResource = get(); if (previousResource == null) { AtomicInteger poolSize = poolSizes.get(this); if (poolSize == null) { poolSizes.put(this, poolSize = new AtomicInteger(0)); } poolSize.incrementAndGet(); } this.set(resource); return previousResource; }
4.HTable的WriteBufferSize和autoflush
若是想追求写入的速度咱们能够设置setWriteBufferSize为一个比较大的大小好比1M并autoflush为false,这样写 入的速度会有几十倍的提高,但若是BufferSize比较大也会带来写入不够实时的问题,尤为有些表的数据很小会好久都不flush。所以,咱们能够添 加按时间间隔的flush方式。
@Override public void put(final List<Put> puts) throws IOException { super.put(puts); needFlush(); } private void needFlush() throws IOException { long currentTime = System.currentTimeMillis(); if ((currentTime - lastFlushTime.longValue()) > flushInterval) { super.flushCommits(); lastFlushTime.set(currentTime); } }
HTablePool能够设置自定义的HTableFactory来建立咱们自定义的HTable。
pool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.ThreadLocal);