HBase海量数据存储

1.简介

 

 

 

HBase是一个基于HDFS的、分布式的、面向列的非关系型数据库。算法

 

HBase的特色shell

1.海量数据存储,HBase表中的数据可以容纳上百亿行*上百万列。数据库

2.面向列的存储,数据在表中是按照列进行存储的,可以动态的增长列并对列进行各类操做。apache

3.准实时查询,HBase在海量的数据量下可以接近准实时的查询(百毫秒之内)数组

4.多版本,HBase中每一列的数据均可以有多个版本。缓存

5.可靠性,HBase中的数据存储于HDFS中且依赖于Zookeeper进行Master和RegionServer的协调管理。 服务器

 

HBase与关系型数据库的区别负载均衡

1.HBase中的数据类型只有String,而关系型数据库中有char、varchar、int等。分布式

2.HBase中只有普通的增删改查操做,没有表与表之间的链接、子查询等,若想要在HBase中进行复杂的操做则应该使用Phoenix。工具

3.HBase是基于列进行存储的,所以在查询指定列的数据时效率会很高,而关系型数据库是基于行存储,每次查询都要查询整行。

4.HBase适合海量数据存储,而关系型数据库通常一张表不超过500M,不然就要考虑分表操做。

5.HBase中为空的列不占用存储空间,表的设计能够很是稀疏,而关系型数据库中表的设计较谨密。

6.HBase不支持事务,而非关系型数据库支持事务。 

7.HBase区分大小写,而SQL不区分大小写。

 

2.HBase的表结构

 

 

 

*HBase中的表由RowKey、ColumnFamily、Column、Timestamp组成。

 

RowKey

记录的惟一标识,至关于关系型数据库中的主键。

*RowKey最大长度为64KB且按字典顺序进行排序存储。

*HBase会自动为RowKey加上索引,当按RowKey查询时速度很快。

 

ColumnFamily

列簇至关于特定的一个类别,每一个列簇下能够有任意数量个列,而且列是动态进行添加的,只在插入数据后存在,HBase在建立表时只须要指定表名和列簇便可。

*一个列簇下的成员有着相同的前缀,使用冒号来对列簇和列名进行分隔。

*一张表中的列簇最好不超过5个。

 

Column

列只有在插入数据后才存在,且列在列簇中是有序的。

*每一个列簇下的列数没有限制。

 

Timestamp

HBase中的每一个键值对都有一个时间戳,在进行插入时由HBase进行自动赋值。

 

 

3.HBase的物理模型

 

 

 

 

 

 

Master

1.处理对表的添加、删除、查询等操做。

2.进行RegionServer的负载均衡(Region与RegionServer的分配)

3.在RegionServer宕机后负责RegionServer上的Region转移(经过WAL日志)

*Master失效仅会致使meta数据和表没法被修改,表中的数据仍然能够进行读取和写入。

 

RegionServer

1.处理对表中数据的添加、删除、修改、查询等操做。

2.维护Region并将Region中StoreFile写入到HDFS中。

3.当Region中的数据达到必定大小时进行Region的切分。

 

Region

1.表中的数据存储在Region中,每一个Region都由RegionServer进行管理。

2.每一个Region都包含MemoryStore和StoreFile,MemoryStore中的数据位于内存,每当MemoryStore中的数据达到128M时将会生成一个StoreFile并写入到HDFS中。

3.Region中每一个列簇对应一个MemoryStore,能够有多个StoreFile,当StoreFile的数量超过必定时,会进行StoreFile的合并,将多个StoreFile文件合并成一个StoreFile,当StoreFile文件的大小超过必定阀值时,会进行Region的切分,由Master将新Region分配到相应的RegionServer中,实现负载均衡。

  

Zookeeper在HBase中的做用

1.保证Master的高可用性,当状态为Active的Master没法提供服务时,会马上将状态为StandBy的Master切换为Active状态。

2.实时监控RegionServer集群,当某个RegionServer节点没法提供服务时将会通知Master,由Master进行RegionServer上的Region转移以及从新进行负载均衡。

3.当HBase集群启动后,Master和RegionServer会分别向Zookeeper进行注册,会在Zookeeper中存放HBase的meta表数据,Region与RegionServer的关系、以及RegionServer的访问地址等信息。

*meta表中维护着TableName、RowKey和Region的关联关系。

 

HBase处理读取和写入请求的流程

HBase处理读取请求的过程

1.客户端链接Zookeeper,根据TableName和RowKey从Meta表中计算出该Row对应的Region。

2.获取该Region所关联的RegionServer,并获取RegionServer的访问地址。

3.访问RegionServer,找到对应的Region。

4.若是Region的MemoryStore中有该Row则直接进行获取,不然从StoreFile中进行查询。

 

HBase处理写入请求的过程

1.客户端链接Zookeeper,根据TableName找到其Region列表。

2.经过必定算法计算出要写入的Region。

3.获取该Region所关联的RegionServer并进行链接。

4.把数据分别写到HLog和MemoryStore中。

5.每当MemoryStore中的大小达到128M时,会生成一个StoreFile。

6.当StoreFile的数量超过必定时,会进行StoreFile的合并,将多个StoreFile文件合并成一个StoreFile,当StoreFile的文件大小超过必定阈值时,会进行Region的切分,由Master将新Region分配到相应的RegionServer中,实现负载均衡。

 

*在第一次读取或写入时才须要链接Zookeeper,会将Zookeeper中的相关数据缓存到本地,日后直接从本地进行读取,当Zookeeper中的信息发生变化时,再经过通知机制通知客户端进行更新。

 

 

HBase在HDFS中的目录

1.tmp目录:当对HBase的表进行建立和删除时,会将表移动到该目录中进行操做。

2.MasterProcWALs目录:预写日志目录,主要用于存储Master的操做日志。

3.WALs目录:预写日志目录,主要用于存储RegionServer的操做日志。

4.data目录:存储Region中的StoreFile。

5.hbase.id文件:HBase集群的惟一标识。

6.hbase.version文件:HBase集群的版本号。

7.oldWALs目录:当WALs目录下的日志文件超过必定时间后,会将其移动到oldWALs目录中,Master会按期进行清理。

 

 

4.HBase集群的搭建

 

1.安装JDK和Hadoop

因为HBase是经过JAVA语言编写的,且HBase是基于HDFS的,所以须要安装JDK和Hadoop,并配置好JAVA_HOME环境变量。

 

因为HDFS通常都以集群的方式运行,所以须要搭建HDFS集群。

*在搭建HDFS集群时,须要相互配置SSH使之互相信任而且开放防火墙相应的端口,或者直接关闭防火墙。

 

2.安装Zookeeper并进行集群的搭建

因为HDFS HA依赖于Zookeeper,且HBase也依赖于Zookeeper,所以须要安装Zookeeper并进行集群的搭建。

 

3.安装HBase

1.从CDH中下载HBase并进行解压:http://archive.cloudera.com/cdh5/cdh/5/ 

 

2.修改hbase-env.sh配置文件

#设置JDK的安装目录
export JAVA_HOME=/usr/jdk8/jdk1.8.0_161

#true则使用hbase自带的zk服务,false则使用外部的zk服务.
export HBASE_MANAGES_ZK=flase

 

3.修改hbase-site.xml配置文件

  <!-- 指定HBase日志的存放目录 -->  
  <property> 
    <name>hbase.tmp.dir</name>  
    <value>/usr/hbase/hbase-1.2.8/logs</value> 
  </property>  
  <!-- 指定HBase中的数据存储在HDFS中的目录 -->  
  <property> 
    <name>hbase.rootdir</name>  
    <value>hdfs://nameservice:8020/hbase</value> 
  </property>  
  <!-- 设置是不是分布式 -->  
  <property> 
    <name>hbase.cluster.distributed</name>  
    <value>true</value> 
  </property>  
  <!-- 指定HBase使用的ZK地址 -->  
  <property> 
    <name>hbase.zookeeper.quorum</name>  
    <value>192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181</value> 
  </property> 

 

4.修改regionservers文件,配置充当RegionServer的节点

*值能够是主机名或者IP地址

*若是Hadoop配置了HDFS HA高可用集群,那么就会有两个NameNode和一个NameService,此时就须要将HDFS的core-site.xml和hdfs-site.xml配置文件复制到HBase的conf目录下,且hbase-site.xml配置文件中的hbase.rootdir配置项的HDFS地址指向NameService的名称。

 

5.NTP时间同步

NTP是一个时间服务器,做用是使集群中的各个节点的时间都保持一致。

因为在HBase集群中,Zookeeper与HBase对时间的要求较高,若是两个节点之间的时间相差过大时,那么整个集群就会崩溃,所以须要使各个节点的时间都保持一致。

#查看是否安装了NTP服务
rpm -qa|grep ntp

#安装NTP服务
yum install ntp -y

#从NTP服务器中获取时间并同步本地
ntpdate 192.168.1.80

*在实际的应用场景中,能够本身搭建NTP服务器,也可使用第三方开源的NTP服务器,如阿里等。

使用 “ntpdate NTP服务器地址” 命令从NTP服务器中获取时间并同步本地,通常配合Linux的crontab使用,每隔5分钟进行一次时间的同步。

 

4.启动集群

使用bin目录下的start-hbase.sh命令启动集群,那么会在当前节点中启动一个Master和RegionSever进程,并经过SSH访问其它节点,启动RegionServer进程。

 

因为HBase的Master HA集群是经过Zookeeper进行协调的,须要手动在其余节点中启动Master,Zookeeper能保证当前HBase集群中有且只有一个Master处于Active状态,当状态为Active的Master没法正常提供服务时,会将处于StandBy的Master的状态修改成Active。 

 

*当HBase集群启动后,能够访问http:/localhost:16030,进入HBase的Web监控页面。

 

 

5.使用Shell操做HBase

 

使用bin/hbase shell命令进行HBase的Shell操做

#建立表 create 'tableName' , 'columnFamily' , 'columnFamily...' #添加记录 put 'tableName' , 'rowkey' , 'columnFamily:column' , 'value' #查询记录 get 'tableName' , 'rowkey' #统计表的记录数 count 'tableName' #删除记录 deleteall 'tableName' , 'rowkey' #删除记录的某一列 delete 'tableName' , 'rowkey' ,'columnFamily:column' #禁用表 disable 'tableName' #启动表 enable 'tableName' #查看表是否被禁用 is_disabled 'tableName' #删除表 drop 'tableName' #查看表中的全部记录 scan 'tableName' #查看表中指定列的全部记录 scan 'tableName' , {COLUMNS=>'columnFamily:column'} #检查表是否存在 exists 'tableName' #查看当前HBase中的表 list

 

*在删除表时须要禁用表,不然没法删除。

*使用put相同rowkey的一条数据来进行记录的更新,仅会更新列相同的值。

 

 

6.使用JAVA操做HBase

 

1.导入相关依赖

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>1.2.8</version>
</dependency>

 

2.初始化配置

使用HBaseConfiguration的create()静态方法建立一个Configuration实例,用于封装环境配置信息。

Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum","192.168.1.80,192.168.1.81,192.168.1.82"); config.set("hbase.zookeeper.property.clientPort","2181");

*此方法会默认加载classpath下的hbase-site.xml配置文件,若是没有此配置文件则须要手动进行环境的配置。

 

3.建立HBase链接对象

Connection conn = ConnectionFactory.createConnection(config);

 

4.进行表的管理

*使用Admin类进行HBase表的管理,经过Connection实例的getAdmin()静态方法返回一个Admin实例。

//判断表是否存在
boolean tableExists(TableName); //遍历HBase中的表定义
HTableDescriptor [] listTables(); //遍历HBase中的表名称
TableName [] listTableNames(); //根据表名获取表定义
HTableDescriptor getTableDescriptor(TableName); //建立表
void createTable(HTableDescriptor); //删除表
void deleteTable(TableName); //启用表
void enableTable(TableName); //禁用表
void disableTable(TableName); //判断表是不是启用状态
boolean isTableEnabled(TableName); //判断表是不是禁用状态
boolean isTableDisabled(TableName); //为表添加列簇
void addColumn(TableName,HColumnDescriptor); //删除表中的列簇
void deleteColumn(TableName,byte); //修改表中的列簇
void modifyColumn(TableName,HColumnDescriptor);

 

TableName实例用于封装表名称。

HTableDescriptor实例用于封装表定义,包括表的名称、表的列簇等。

HColumnDescriptor实例用于封装表的列簇。

 

5.对表中的数据进行增删改查

使用Table类进行表数据的增删改查,经过Connection的getTable(TableName)静态方法返回一个Table实例。

//判断指定RowKey的数据是否存在
boolean exists(Get get); //根据RowKey获取数据
Result get(Get get); //根据多个RowKey获取数据
Result [] get(List<Get>); //获取表的扫描器
ResultScanner getScanner(Scan); //添加数据
void put(Put); //批量添加数据
void put(List<Put>); //删除数据
void delete(Delete); //批量删除数据
void delete(List<Delete>)

 

使用Get实例封装查询参数,使用其构建方法设置RowKey。

使用Put实例封装新增和更新参数,使用其构建方法设置RowKey,使用其addColumn(byte[] family , byte[] qualifier , byte[] value)方法分别指定列簇、列名、列值。

使用Delete实例封装删除参数,使用其构建方法设置RowKey。

使用Scan实例封装扫描器的查询条件,使用其addFamily(byte[] family)方法设置扫描的列簇,使用其addColumn(byte[] family , byte[] qualifier)方法分别指定要扫描的列簇和列名。

 

*在进行表的增删改查时,方法参数大多都是字节数组类型,可使用HBase Java提供的Bytes工具类进行字符串和字节数组之间的转换。

*在进行查询操做时,会返回Result实例,Result实例包含了一个RowKey的全部键值对(cell,不区分列簇),能够经过Result实例的listCells()方法获取其包含的全部cell,借助CellUtil工具类获取Cell实例中对应的RowKey、Family、Qualifier、Value等属性信息。

*在使用getScanner扫描时,返回的ResultScanner接口继承Iterable接口,其泛型是Result,所以能够理解成ResultScanner是Result的一个集合。

 

6.完整的HBaseUtil

/**
 * @Auther: ZHUANGHAOTANG
 * @Date: 2018/11/26 11:40
 * @Description:
 */
public class HBaseUtils {

    private static final Logger logger = LoggerFactory.getLogger(HBaseUtils.class);

    /**
     * ZK集群地址
     */
    private static final String ZK_CLUSTER_HOSTS = "192.168.1.80,192.168.1.81,192.168.1.82";

    /**
     * ZK端口
     */
    private static final String ZK_CLUSTER_PORT = "2181";

    /**
     * HBase全局链接
     */
    private static Connection connection;

    static {
        //默认加载classpath下hbase-site.xml文件
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", ZK_CLUSTER_HOSTS);
        configuration.set("hbase.zookeeper.property.clientPort", ZK_CLUSTER_PORT);
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (Exception e) {
            logger.info("初始化HBase链接失败:", e);
        }
    }

    /**
     * 返回链接
     */
    public static Connection getConnection() {
        return connection;
    }

    /**
     * 建立表
     */
    public static void createTable(String tableName, String... families) throws Exception {
        Admin admin = connection.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            throw new UnsupportedOperationException("tableName " + tableName + " is already exists");
        }
        HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String family : families)
            descriptor.addFamily(new HColumnDescriptor(family));
        admin.createTable(descriptor);
    }

    /**
     * 删除表
     */
    public static void deleteTable(String tableName) throws Exception {
        Admin admin = connection.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
        }
    }

    /**
     * 获取全部表名称
     */
    public static TableName[] getTableNameList() throws Exception {
        Admin admin = connection.getAdmin();
        return admin.listTableNames();
    }

    /**
     * 获取全部表定义
     */
    public static HTableDescriptor[] getTableDescriptorList() throws Exception {
        Admin admin = connection.getAdmin();
        return admin.listTables();
    }

    /**
     * 为表添加列簇
     */
    public static void addFamily(String tableName, String family) throws Exception {
        Admin admin = connection.getAdmin();
        if (!admin.tableExists(TableName.valueOf(tableName))) {
            throw new UnsupportedOperationException("tableName " + tableName + " is not exists");
        }
        admin.addColumn(TableName.valueOf(tableName), new HColumnDescriptor(family));
    }

    /**
     * 删除表中指定的列簇
     */
    public static void deleteFamily(String tableName, String family) throws Exception {
        Admin admin = connection.getAdmin();
        admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(family));
    }

    /**
     * 为表添加一条数据
     */
    public static void put(String tableName, String rowKey, String family, Map<String, String> values) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        for (Map.Entry<String, String> entry : values.entrySet())
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
        table.put(put);
    }

    /**
     * 批量为表添加数据
     */
    public static void batchPut(String tableName, String family, Map<String, Map<String, String>> values) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        List<Put> puts = new ArrayList<>();
        for (Map.Entry<String, Map<String, String>> entry : values.entrySet()) {
            Put put = new Put(Bytes.toBytes(entry.getKey()));
            for (Map.Entry<String, String> subEntry : entry.getValue().entrySet())
                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(subEntry.getKey()), Bytes.toBytes(subEntry.getValue()));
            puts.add(put);
        }
        table.put(puts);
    }

    /**
     * 删除RowKey中的某列
     */
    public static void deleteColumn(String tableName, String rowKey, String family, String qualifier) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table.delete(delete);
    }

    /**
     * 删除RowKey
     */
    public static void delete(String tableName, String rowKey) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        table.delete(new Delete(Bytes.toBytes(rowKey)));
    }

    /**
     * 批量删除RowKey
     */
    public static void batchDelete(String tableName, String... rowKeys) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        List<Delete> deletes = new ArrayList<>();
        for (String rowKey : rowKeys)
            deletes.add(new Delete(Bytes.toBytes(rowKey)));
        table.delete(deletes);
    }

    /**
     * 根据RowKey获取数据
     */
    public static Map<String, String> get(String tableName, String rowKey) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Result result = table.get(new Get(Bytes.toBytes(rowKey)));
        List<Cell> cells = result.listCells();
        Map<String, String> cellsMap = new HashMap<>();
        for (Cell cell : cells) {
            cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return cellsMap;
    }

    /**
     * 获取全表数据
     */
    public static Map<String, Map<String, String>> scan(String tableName) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        ResultScanner resultScanner = table.getScanner(new Scan());
        return getResult(resultScanner);
    }

    /**
     * 获取某列数据
     */
    public static Map<String, Map<String, String>> scan(String tableName, String family, String qualifier) throws Exception {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        ResultScanner resultScanner = table.getScanner(scan);
        return getResult(resultScanner);
    }

    private static Map<String, Map<String, String>> getResult(ResultScanner resultScanner) {
        Map<String, Map<String, String>> resultMap = new HashMap<>();
        for (Result result : resultScanner) {
            List<Cell> cells = result.listCells();
            Map<String, String> cellsMap = new HashMap<>();
            for (Cell cell : cells)
                cellsMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
            resultMap.put(Bytes.toString(result.getRow()), cellsMap);
        }
        return resultMap;
    }

}
相关文章
相关标签/搜索