Python 经过thrift接口链接Hbase读取存储数据

介绍: python

Hbase:开源的分布式数据库 数据库

资料介绍:http://www.oschina.net/p/hbase api

Thrift:一个软件框架,用来进行可扩展且跨语言的服务的开发。最初由Facebook开发,做为Hadoop的一个工具,提供跨语言服务开发; 数组

资料介绍:http://dongxicheng.org/search-engine/thrift-guide/ app

官方使用手册:http://download.csdn.net/detail/wyjzt999/5141006从安装到使用都很全面 框架

咱们项目里客户端是用python开发,所以须要Thrift提供server端,通过thrift对Hbase进行数据读写操做,性能很是不错,而且能够在Hadoop集群上作并行拓展,稳定性高,Facebook内部通讯也是采用thrift来作; dom

 

首先学习一下Hbase的表结构: socket

Row Key 分布式

Row key行键 (Row key)能够是任意字符串(最大长度是 64KB,实际应用中长度通常为 10-100bytes),在hbase内部,row key保存为字节数组。 ide

列族 (column family)

hbase表中的每一个列,都归属与某个列族。列族是表的chema的一部分(而列不是),必须在使用表以前定义。列名都以列族做为前缀。例如courses:history , courses:math 都属于 courses 这个列族。

时间戳

HBase中经过row和columns肯定的为一个存贮单元称为cell。每一个 cell都保存着同一份数据的多个版本。版本经过时间戳来索引。时间戳的类型是 64位整型。时间戳能够由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也能够由客户显式赋值。若是应用程序要避免数据版本冲突,就必须本身生成具备惟一性的时间戳。每一个 cell中,不一样版本的数据按照时间倒序排序,即最新的数据排在最前面。

为了不数据存在过多版本形成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段时间内的版本(好比最近七天)。用户能够针对每一个列族进行设置。

对Hbase而言,表结构设计会对系统的性能以及开销上形成很大的区别;


 

 1.首先创建与thriftserver端的链接

from thrift import Thrift
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase

#server端地址和端口
transport = TSocket.TSocket(host, port)
#能够设置超时
transport.setTimeout(5000)
#设置传输方式(TFramedTransport或TBufferedTransport)
trans = TTransport.TBufferedTransport(transport)
#设置传输协议
protocol = TBinaryProtocol.TBinaryProtocol(trans)
#肯定客户端
client = Hbase.Client(protocol)
#打开链接
transport.open()

2.而后就能够作具体的操做,好比查表,删表,插入Row Key等等
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo
from hbase.ttypes import IOError, AlreadyExists

#获取表名
client.getTableNames()
#建立新表
_TABLE = "keyword"
demo = ColumnDescriptor(name='data:',maxVersions = 10)#列族data能保留最近的10个数据,每一个列名后面要跟:号
createTable(_TABLE, [demo])

#建立列名2个data:url data:word  
tmp1= [Mutation(column="data:url", value="www.baidu.com")]
tmp2= [Mutation(column="data:word", value="YaGer")]
#新建2个列 (表名,行键, 列名)
client.mutateRow(_TABLE, row, tmp1)
client.mutateRow(_TABLE, row, tmp1)


#从表中取数据
#经过最大版本数取数据
client.getByMaxver(_TABLE,'00001','data:word', 10)#一次取10个版本
#取列族内数据
client.getColumns(_TABLE, '00001')



3.支持thrift并行拓展和失效转移Failover机制

#file name:hbaseconn.py
#! /usr/bin/env python  
# -*- coding: utf-8 -*-
#提供创建链接的方法和取数据操做的方法
import logging
import traceback
import time,sys
from unittest import TestCase, main
import socket

from thrift import Thrift  
from thrift.transport import TSocket  
from thrift.transport import TTransport  
from thrift.protocol import TBinaryProtocol  
from hbase import Hbase  

from hbase.ttypes import IOError as HbaseIOError, AlreadyExists
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation


_TABLE = 'thrift_check'
_ROW = 'test_for_thrift'
_DATA = 'data:word'
_VALUE = 'Flag'

class DbConnError(Exception):
    """Conn Db exception. 
    
    Timeout or any except for connection from low layer api
    """
    pass

class Connection:
    def __init__(self, trans, client, addr, port):
        self.trans = trans
        self.client = client
        self.hp = addr
        self.port = port
        pass

class CenterDb:
    @classmethod
    def open(cls, host_port):

        cls.tc_list = []
        for hp in host_port:
            trans, client = cls._open(*hp)
            cls.tc_list.append(Connection(trans, client, hp[0], hp[1]))

        return cls.tc_list

    @classmethod
    def _open(cls, host, port):
        transport = TSocket.TSocket(host, port)
        transport.setTimeout(5000)
        trans = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(trans)
        client = Hbase.Client(protocol)
        ok = False
        try:
            trans.open()
            ok = True
        except TSocket.TTransportException, e:
            logerr('CenterDb(Hbase) Open error(%s, %d)' % (host, port))
            ok = False
        else:
            pass
#            dbg('CenterDb connected (%s, %d)' % (host, port))
        return trans, client

    @classmethod
    def _initTable(cls, client):
        dat = ColumnDescriptor(name = 'data', maxVersions = 1)
        tmp = [Mutation(column = _DATA, value = _VALUE)]
        try:
           client.createTable(_TABLE, [dat])
           client.mutateRow(_TABLE, _ROW, tmp)
           dbg("Create Table For Thrift Test Success!")
           return  True
        except AlreadyExists:
            return True
        return False

    @classmethod
    def _get(cls, client):
        client.getVer(_TABLE,_ROW,_DATA, 1)
        return True

    @classmethod
    def _reconnect(cls, trans):
        trans.close()
        trans.open()


    def __init__(self, transport, client):
        self.t = transport
        self.c = client

    def __str__(self):
        return 'CenterDb.open(%s, %d)' % (self.t, self.c)

    def __del__(self):
        self.t.close()
    

    def getColumns(self, table, row):

        tr_list = []
        tr_list = self._failover('getRow', table, row)
        if (not tr_list):
            return {}
        return tr_list[0].columns



#file name: thriftmanage.py
#! /usr/bin/python
# -*- coding: utf-8 -*-
#提供管理thrift链接的方法
#建一个线程循环检测thrift的可以使用性,非链接池
import logging
import time,sys,random
import threading
from hbase import Hbase  
from hbase.ttypes import IOError as HbaseIOError
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation
import hbaseconn
from hbaseconn import CenterDb


class Failover(threading.Thread):
    def __init__(self,serverid, serverlist):
        threading.Thread.__init__(self)

        self.serverlist = serverlist
        self.serverid = serverid
        
        self.lenlist = len(serverlist)
        
        self.conn_list = []
        self.invalid_con_list = []
        '''
            cur_conn : now oper is using 
            my_conn : this schd server should use
            [(trans1,client1),(trans2,client2),...] 
        '''
        self.cur_conn = None
        self.my_conn = None
        
        self._makeConn()
        self._getConn()
        self._init_for_check()

        self.switched_flag = False


    def _makeConn(self):
        '''make all the conntion to thrift serverlist
        '''
        self.conn_list = CenterDb.open(self.serverlist)

    def _getConn(self):
        '''get only one connection from the conn_list,confirm cur_conn and my_conn
        '''
        self.cur_conn = self.my_conn = self.conn_list[(int(self.serverid) % self.lenlist)] 

        self.other_conn = self.conn_list[:]
        self.other_conn.remove(self.my_conn)
    
        return True

    def _init_for_check(self):
    
        '''init _TABLLE test_for_flag 
        '''
        try:
            if not CenterDb._initTable( self.my_conn.client ):
                dbg("Error In Create Table For Thrift Test!")
        except Exception, e:
            dbg("init_for_check thrift:%s" % e)

        
        '''make the only conn for check proc
        '''
        self.check_conn = CenterDb.open( ((self.my_conn.hp, self.my_conn.port),))[0]  

    def _switch(self):
        '''when my_conn failed, choose the other client randomly;
           when my_conn is reset OK, cur_conn will use my_conn again
        '''
        #print 'Schd%s come in _switch' % self.serverid
        if 0 == len(self.other_conn):
            return False
        trycount = 0
        while True:
            try:
                if trycount == 3*self.lenlist :#try 3*length times
                    return False
                tmp_conn = random.choice(self.other_conn)
                #CenterDb._reconnect(tmp_conn.trans)
                #DEBUG
                if self._checker(tmp_conn):
                    self.cur_conn = tmp_conn
                    dbg('Schd%s _switch cur_conn: %s' % (self.serverid, self.cur_conn.hp))
                    return True
                else:
                    trycount += 1
                    logerr('Schd%s _switch for %d times' % (self.serverid, trycount))
                    CenterDb._reconnect(tmp_conn.trans)#close this trans and try again;breakdown early
                    if self._checker(tmp_conn):
                        self.cur_conn = tmp_conn
                        return True
                    else:
                        continue#can't be used
            except Exception, e:
                continue

    def _failover(self,oper, *args, **kwargs):
        result = []
        try :
            result = getattr(self.cur_conn.client, oper)(*args, **kwargs)
            return result

        except HbaseIOError, e:
             logerr("_failover : %s " % e)

        except Exception, e:
            logerr( 'Schd%s _failover : Connect to %s thrift Server closed! Choose another......Reason:%s ' % \
                (self.serverid, self.cur_conn.hp, e))
            self.cur_conn.trans.close()
            self.switched_flag = True
            if self._switch():
                logerr('Schd%s _failover : Now using %s server' % (self.serverid, self.cur_conn.hp))
                result = getattr(self.cur_conn.client, oper)(*args, **kwargs)
            else: 
                logerr( 'Schd%s _failover : Switch 3 rotate Find No Healthy Thrift server !' % self.serverid)
        return result

    def getByMaxver(self, table, row, column, max_ver):
        """Get cell list by ver
        Get cell list no more than max versions
        Args:
            table: table name
            row: row key
            column: column name
            max_ver: max version to retrieve cell
        """
        cell_list = []
        start_time = time.time()
        cell_list = self._failover('getVer', table, row, column, max_ver)
        if (not cell_list):
            return []
        
        take = time.time() - start_time
        if take > 0.015:
            logerr('Hbase over 15ms:take %s ms' % str("%.3f" % (take*1000)))

        return map(lambda x: (x.value, x.timestamp), cell_list)
    
    def getColumns(self, table, row):
        tr_list = []
        tr_list = self._failover('getRow', table, row)
        if (not tr_list):
            return {}
        return tr_list[0].columns

                
    def _checker(self,conn):
        '''check my_conn , be sure it is connected and get data OK
        '''
        try:
            if conn.trans.isOpen():
                CenterDb._get(conn.client)
                return True    
            else: 
                return False 
            
        except Exception, e:
            logerr( 'Schd%s  _checker : Connect to %s closed! Please Restart now..... Reason: %s ' % \
                (self.serverid, conn.hp, e))
            return False
        
    def _restart(self):
        '''if my_conn failed, restart it's trans 
        '''
        while True:
            #print 'come in _restart %s ' % self.my_conn.hp
            self.my_conn.trans.close()
            self.check_conn.trans.close()
            try:
                time.sleep(2)
                self.my_conn.trans.open()
                self.check_conn.trans.open()
                if self.my_conn.trans.isOpen() and self.check_conn.trans.isOpen():
                    CenterDb._get(self.check_conn.client)
                    #self.check_conn.client.getVer('keywordurl','test_for_thrift','data:word', 1)
                    return True
                else: continue    
            except Exception, e:
                logerr('Schd%s _restart : Connect to %s is not restart yet ... Reason:%s ' % \
                    (self.serverid, self.my_conn.hp, e))
                self.my_conn.trans.close()
                self.check_conn.trans.close()
                continue

    def run(self):
        while True:
            time.sleep(1)
            if self._checker(self.check_conn) and (not self.switched_flag):
                continue
            else:
                if self._restart():
                    logerr( 'Schd%s Connection from  %s to my:%s Recovered ! ' % \
                        (self.serverid,self.cur_conn.hp,self.my_conn.hp))
                    self.cur_conn = self.my_conn
                    self.switched_flag = False
                continue


使用thrift生成的代码中提供的方法有: void enableTable(Bytes tableName) void disableTable(Bytes tableName) bool isTableEnabled(Bytes tableName) void compact(Bytes tableNameOrRegionName) void majorCompact(Bytes tableNameOrRegionName) getTableNames() getColumnDescriptors(Text tableName) getTableRegions(Text tableName) void createTable(Text tableName, columnFamilies) void deleteTable(Text tableName) get(Text tableName, Text row, Text column) getVer(Text tableName, Text row, Text column, i32 numVersions) getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions) getRow(Text tableName, Text row) getRowWithColumns(Text tableName, Text row,  columns) getRowTs(Text tableName, Text row, i64 timestamp) getRowWithColumnsTs(Text tableName, Text row,  columns, i64 timestamp) getRows(Text tableName,  rows) getRowsWithColumns(Text tableName,  rows,  columns) getRowsTs(Text tableName,  rows, i64 timestamp) getRowsWithColumnsTs(Text tableName,  rows,  columns, i64 timestamp) void mutateRow(Text tableName, Text row,  mutations) void mutateRowTs(Text tableName, Text row,  mutations, i64 timestamp) void mutateRows(Text tableName,  rowBatches) void mutateRowsTs(Text tableName,  rowBatches, i64 timestamp) i64 atomicIncrement(Text tableName, Text row, Text column, i64 value) void deleteAll(Text tableName, Text row, Text column) void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp) void deleteAllRow(Text tableName, Text row) void deleteAllRowTs(Text tableName, Text row, i64 timestamp) ScannerID scannerOpenWithScan(Text tableName, TScan scan) ScannerID scannerOpen(Text tableName, Text startRow,  columns) ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow,  columns) ScannerID scannerOpenWithPrefix(Text tableName, Text startAndPrefix,  columns) ScannerID scannerOpenTs(Text tableName, Text startRow,  columns, i64 timestamp) ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow,  columns, i64 timestamp) scannerGet(ScannerID id) scannerGetList(ScannerID id, i32 nbRows) void scannerClose(ScannerID id)    参考:http://yannramin.com/2008/07/19/using-facebook-thrift-with-python-and-hbase/

相关文章
相关标签/搜索