Hadoop各组件详解(Hbase篇 持续更新版)

一.HBASE简介

在这里插入图片描述

1.1 什么是HBasenode

1)HBASE译为“Hadoop Database”,是一个高可靠性、高性能、列存储、可伸缩、实时读写的NoSQL数据库系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群
2)主要用来存储结构化和半结构化的松散数据
3)HBASE是Google Bigtable的开源实现,可是也有不少不一样之处。好比:Google Bigtable使用GFS做为其文件存储系统,HBASE利用Hadoop HDFS做为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE一样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby做为协同服务,HBASE利用Zookeeper做为协同服务
4)HBASE的目标是存储并处理大型的数据,更具体来讲是仅需使用普通的硬件配置,就可以处理由成千上万的行和列所组成的大型数据web

1.2 与传统数据库的对比shell

1)传统数据库遇到的问题:数据库

  • ① 数据量很大的时候没法存储
  • ② 没有很好的备份机制
  • ③ 数据达到必定数量开始缓慢,很大的话基本没法支撑

2)HBASE优点:apache

  • ① 线性扩展,随着数据量增多能够经过节点扩展进行支撑
  • ② 数据存储在HDFS上,备份机制健全
  • ③ 经过zookeeper协调查找数据,访问速度快

1.3 HBase集群中的角色vim

1)一个或者多个主节点 : HMaster
2)多个从节点 : HRegionServer
3)HBase依赖项 : Zookeeper,HDFS数组

2、HBASE数据模型

2.1 ROW KEY

1)访问hbase table中的行,只有三种方式:缓存

  • ① 经过单个row key访问 (get)
  • ② 经过row key的range(范围) (scan)
  • ③ 全表扫描 (scan)

2)Hbase会对表中的数据按照Row key排序(字典顺序)
3)Row key只能存储64k的字节数据(实际应用中长度通常为 10-100bytes),在Hbase内部,Row key保存形式为字节数组安全

2.2 Column Family列族 & Column列

1)HBase表中的每一个列都归属于某个列族,列族必须做为表模式(schema)定义的一部分预先给出。如 create ‘表名’, ‘列族名’
2)列名以列族做为前缀,每一个“列族”均可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)能够随后按需、动态的加入
3)权限控制、存储以及调优都是在列族层面进行的
4)HBase把同一列族里面的数据存储在同一目录下,由几个文件保存bash

2.3 Timestamp时间戳

1)在HBase每一个cell存储单元对同一份数据有多个版本,根据惟一的时间戳来区分每一个版本之间的差别,不一样版本的数据按照时间倒序排序,最新的数据版本排在最前面
2)时间戳的类型是 64位整型
3)时间戳能够由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间
4)时间戳也能够由客户显式赋值,若是应用程序要避免数据版本冲突,就必须本身生成具备惟一性的时间戳
5)为了不数据存在过多版本形成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式:

  • ①.保存数据的最后n个版本
  • ②.保存最近一段时间内的版本(设置数据的生命周期TTL)

6)用户能够针对每一个列族进行设置

2.4 Cell单元格

1)由 RowKey 和 "ColumnFamily:Column"的坐标交叉决定
2)单元格是有版本的
3)单元格的内容是未解析的字节数组, 由 {row key, column( =<family> +<qualifier>), version} 惟一肯定的单元;cell中的数据是没有类型的,所有是字节码形式存贮

2.5 VersionNum

1)数据的版本号,每条数据能够有多个版本号,默认值为系统时间戳,类型为Long

2.6 Region

1)HBase自动把表水平划分红多个区域(region),每一个region会保存一个表里面某段连续的数据;每一个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region(裂变)

在这里插入图片描述
在这里插入图片描述
2)当table中的行不断增多,就会有愈来愈多的region。这样一张完整的表被切分红多个Region,保存在多个RegionServer上
3)Region是负载均衡的最小单元,最小单元就表示不一样的Region能够分布在不一样的HRegion server上。但一个Region是不会拆分到多个RegionServer上的

在这里插入图片描述
4)Region虽然是负载均衡的最小单元,但并非物理存储的最小单元;
事实上,Region由一个或者多个Store组成,每一个Store保存一个column family;
每一个Strore又由一个MemStore和0至多个StoreFile组成

在这里插入图片描述

3、HBASE各角色做用解析

3.1 Client

1)Client包含了访问Hbase的接口,另外Client还维护了对应的cache来加速Hbase的访问,好比cache的.META.元数据的信息

3.2 Zookeeper

1)保证任什么时候候,集群中只有一个Master,若是Master异常,会经过竞争机制产生新的Master提供服务
2)存贮全部Region的寻址入口
3)实时监控RegionServer的状态,将RegionServer的上线和下线信息实时通知给Master
4)存储HBase的schema和table元数据信息

3.3 Master

1)为RegionServer分配Region
2)负责RegionServer的负载均衡
3)发现失效的RegionServer并从新分配其上的Region
4)处理对schema更新请求

3.4 RegionServer

1)RegionServer维护Master分配给它的Region,处理对Region的IO请求
2)RegionServer负责切分在运行过程当中变得过大的Region;能够看到,Client访问Hbase上数据的过程并不须要Master参与(寻址访问Zookeeper和RegionServer,数据读写访问RegioneServer),Master仅仅维护者table和Region的元数据信息,负载很低
3)负责和底层HDFS的交互,存储数据到HDFS
4)负责Storefile的合并工做


4、HBase基于HDFS、ZK架构图

在这里插入图片描述

5、HBASE读写流程

5.1 写数据

在这里插入图片描述

1)Client 访问 Zookeeper,获取 hbase:meta表位于哪一个 RegionServer

2)访问对应的RegionServer,获取hbase:meta表,根据rowkey肯定当前将要写入的数据所对应的RegionServer
服务器和Region,并将该table的Region信息以及meta表的位置信息缓存在客户端的 Meta cache,方便下次访问

3)Client与目标RegionServer进行通信,发起写入数据请求

4)Client将数据写入(追加)到HLog(WAL,Write ahead log,预写日志),以防止数据丢失

5)而后将数据写入对应的MemStore,数据会在MemStore内进行排序

6)若是HLog和Memstore都写入成功,则这条数据写入成功,向客户端发送消息,告知写入成功,若是其中一个写入失败,就表示此次写入失败

7)等达到MemStore的阈值后,将数据flush到磁盘成为StoreFile,当StoreFile达到阈值(默认3个)后进行compact操做,将多个StoreFile合并成一个大StoreFile

注意:

1)此过程没有Master的参与
2)客户端在写数据时会先去查看本地的meta cache,若是有以前的缓存数据,直接经过缓存元数据访问 RegionServer,这时不经过Zookeeper获取meta表去查找元数据,。若是集群发生某些变化致使hbase:meta元数据更改,客户端再根据本地元数据表请求的时候就会发生异常,此时客户端须要从新加载一份最新的元数据表到本地
3MemStore触发时机

5.2 读数据

在这里插入图片描述

1)Client 访问 Zookeeper,获取 hbase:meta表位于哪一个 RegionServer

2)访问对应的RegionServer,获取hbase:meta表,根据rowkey肯定当前将要读取的数据位于哪一个RegionServer
中的哪一个Region中。并将该table的region信息以及meta表的位置缓存在客户端 meta cache,方便下次访问

3)Client向该RegionServer服务器发起读取数据请求,而后RegionServer收到请求并响应

4)分别在BlockCache(读缓存)、MemStore和StoreFile(BlockCache中有的数据
在StoreFile中就再也不读取)中查询目标数据,并将查到的全部数据进行合并;
此处的全部数据指的是同一条数据的不一样版本(Time Stamp)或者不一样的类型(Put/Delete)

5)将从文件中查询到的数据块(Block,HFile数据存储单元,默认大小为64KB)缓存到Block Cache

6)将合并后的最终结果返回给客户端

注意:

1)此过程没有Master的参与
2)客户端在读数据时会先去查看本地的meta cache,若是有以前的缓存数据,直接经过缓存元数据访问 RegionServer,这时不经过Zookeeper获取meta表去查找元数据,。若是集群发生某些变化致使hbase:meta元数据更改,客户端再根据本地元数据表请求的时候就会发生异常,此时客户端须要从新加载一份最新的元数据表到本地

6、HBASE集群安装

前提:Hadoop集群要启动正常,Zookeeper集群启动正常

1.解压tar包

tar -zxf hbase-1.2.0-cdh5.14.0.tar.gz ../servers/hbase

2.配置环境变量(根据本身环境配置)

vim /etc/profile.d/hbase.sh

export HBASE_HOME=/export/servers/hbase
export PATH=$PATH:$HBASE_HOME/bin

source /etc/profile

3.修改配置文件

cd $HBASE_HOME/conf

1.vim hbase-env.sh

27行左右配置JAVA_HOME
在这里插入图片描述
4六、47行左右注释掉,JDK1.7须要使用,咱们使用的JDK1.8,因此不须要,注释掉,否则启动Hbase会有警告,看着‘烦’
在这里插入图片描述
128行左右取消注释,修改true为false,使用咱们本身Zookeeper
在这里插入图片描述

2.vim hbase-site.xml

根据本身环境添加以下代码↓↓↓

<configuration>
        <!-- Hbase在HDFS上的根目录 -->
        <property>
                <name>hbase.rootdir</name>
                <value>hdfs://node01:8020/hbase</value>
        </property>
    <!-- Hbase集群模式,false表示hbase单机模式,true表示是分布式模式,默认false-->
        <property>
                <name>hbase.cluster.distributed</name>
                <value>true</value>
        </property>

   <!-- 0.98后的新变更,以前版本没有.port,默认端口为60000 -->
   <!-- hbase master节点端口 -->
        <property>
                <name>hbase.master.port</name>
                <value>16000</value>
        </property>
   <!-- 添加zookeeper节点 -->
        <property>
                <name>hbase.zookeeper.quorum</name>
                <value>node01:2181,node02:2181,node03:2181</value>
        </property>
   <!-- 配置hbase的数据在zookeeper上存储的目录,也就是Zookeeper的myid所在的目录 -->
        <property>
                <name>hbase.zookeeper.property.dataDir</name>
                <value>/export/servers/zookeeper/zkdata</value>
        </property>
</configuration>

3.vim regionservers

添加RegionServer节点主机名
在这里插入图片描述

4.vim backup-masters(配置备份master,此文件默认不存在)

添加Master备份节点主机名
在这里插入图片描述

5.软链接Hadoop配置到Hbase下

由于Hbase基于HDFS,因此须要读取Hadoop配置

ln -s $HADOOP_HOME/etc/hadoop/core-site.xml $HBASE_HOME/conf/core-site.xml
ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml $HBASE_HOME/conf/hdfs-site.xml

6.分发安装包和环境变量

scp -r hbase node02:`pwd`
scp -r hbase node03:`pwd`
scp hbase.sh node02:`pwd`
scp hbase.sh node03:`pwd`

注意:
分发完记得  source /etc/profile

7.启动并使用JPS查看

在主节点启动时记得免密钥哦!

启动hbase集群:
start-hbase.sh
查看
jps

若是三台节点都有 HMaster 和 HRegionServer 表示启动成功
也能够经过访问WEB页面 主机名:60010 来查看
或使用 hbase shell命令进入 hbase shell窗口 使用 list 命令查看

7、HBASE配置文件详解

  • backup-masters - 这是一个纯文本文件,其中列出了主服务器应在其上启动备份主进程的主机列表,每行一台主机名(默认状况下不存在)
  • hadoop-metric2-hbase.properties - 用于链接HBase和Hadoop的Metric2框架,默认状况下只包含注释出的示例
  • hbase-env.cmd 和 hbase-env.sh - 用于Windows和Linux/Unix环境的脚本,用于设置HBase的工做环境,包括 JAVA和其余环境变量的配置(该文件包含许多注释示例来提供指导,该文件的改动须要重启HBase才能生效)
  • hbase-policy.xml - RPC服务器使用默认策略配置文件对客户端请求进行受权决策(仅在启用HBase安全性的状况下使用)
  • hbase-site.xml - 主要的HBase配置文件,该文件覆盖了HBase的默认配置的配置选项(能够在HBase Web UI的HBase配置选项中查看整个集群的有效配置,包括默认和覆盖的)
  • log4j.properties - HBase日志记录的配置文件
  • regionservers - 纯文本文件,包含在HBase集群中运行RegionSever的主机名或IP列表,每行一个(默认状况下,这个文件包含单条条目 localhost)

8、HBASE Shell

  • 1.进入HBase客户端命令操做界面
hbase shell
  • 2.查看帮助命令
help
  • 3.统计一张表有多少行数据
count 'table'
  • 4.显示服务器状态
status '主机名'
  • 5.显示HBase当前用户
whoami
  • 6.检查表是否存在,适用于表量特别多的状况
exists 'table'
  • 7.检查表是否启用或禁用
is_enabled 'table' 
	is_disabled	'table'
  • 8.启用一张表/禁用一张表
enable 'table' 
	disable 'table'

DDL:

增
	1.建立表
		create 'table','columnFamily'...'columnFamily'...

	2.建立命名空间
		create_namespace '命名空间名' 	

	2.建立表并指定版本数
		create 'table',{NAME => 'columnFamily',VERSIONS => num}...

删
	1.删除表(先禁用,再删除)
		(1).disable 'table'	
		(2).drop 'table'

	2.删除命名空间(先禁用命名空间下全部表,再删除全部表,最后删除命名空间)
		(1).disable_all '命名空间名.*'
		(2).drop_all '命名空间名.*'
		(3).drop_namespace '命名空间名'	

改
	1.添加列族/修改列族版本数
	  默认VERSION版本
	    alter 'table','columnFamily'...
	  指定VERSION版本
		alter 'table',{NAME => 'columnFamily',VERSIONS => num}...
		
查
	1.查看全部表
		list

	2.查看表结构信息
		desc(describe) 'table'

	3.查看全部命名空间
		list_namespace

DML:

增/改
	1.表添加/修改数据
		put 'table','rowkey','columnFamily:column','value',(timestamp)

删
	1.表删除列
		delete 'table','rowkey','columnFamily:column',(timestamp)

	2.表删除列族
		(1).delete 'table','rowkey','columnFamily' 
		(2).alter 'table', NAME => 'columnFamily', METHOD => 'delete' 
			alter 'table', 'delete' => 'columnFamily'

	3.表删除rowkey
		(1).delete 'table','rowkey'
		(2).deleteall 'table','rowkey'

	4.清空表数据
		truncate 'table'

查
	1.查询表的全部数据
		scan 'table'

	2.根据 列族/列名 查询表的数据	
		scan 'table',{COLUMN => 'columnFamily:column'}
		scan 'table', {COLUMNS => ['columnFamily:column'...],VERSIONS => num}	

	3.根据 rowkey 查询指定范围(左闭右开)	
		scan 'table',{STARTROW => 'rowkey',ENDROW => 'rowkey'}

	4.查询版本号为num之内的表的全部数据
		scan 'table',{RAW => true , VERSIONS => num}
		scan 'table',{COLUMNs => 'columnFamily',RAW => true , VERSIONS => num}	
	
	5.scan模糊查询
		scan 'table',{FILTER=>"PrefixFilter('startStr')"}
		scan 'table',{TIMERANGE => [时间戳1, 时间戳2]}

	6.get查询	
		(1).get 'table','rowkey'
		(2).get 'table','rowkey','columnFamily'...
		(3).get 'table','rowkey','columnFamily:column'...
		(4).get 'table','rowkey',{COLUMN => 'columnFamily:column',VERSIONS => num}
		(5).get 'table','rowkey',{FILTER => "ValueFilter(=, 'binary:str')"}

9、HBASE API

1.API基本操做

建立Maven工程,导入jar包
<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0-mr1-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.0-cdh5.14.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testng</groupId>
        <artifactId>testng</artifactId>
        <version>6.14.3</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!-- <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <!--将咱们其余用到的一些jar包所有都打包进来 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <minimizeJar>false</minimizeJar>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
(1)HBaseAPI基本操做
public class demo01 {

    private static Connection connection = null;
    private static Admin admin = null;

	//初始化配置
    static {
        try {
            //1.获取配置文件信息
            Configuration configuration = HBaseConfiguration.create();
            //默认2181,可写
            configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
            //2.获取管理员对象
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //关闭资源
    public static void close() {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        //1.测试表是否存在
        System.out.println(isTableExist("ccc"));
        //2.建立表
        createTable("ccc","info1","info2");
        //3.删除表
        dropTable("ccc");
        //4.建立命名空间
        createNameSpace("test");
        //5.插入数据
        putData("ccc","1001","info1","name","zhangsan");
        //6.获取数据
        getData("ccc","1000","info1","name");
        //7.

        //8.
        //关闭资源
        close();
    }

    //1.判断表是否存在
    public static boolean isTableExist(String tableName) throws IOException {
        //1.判断表是否存在
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        return exists;
    }

    //2.建立表
    public static void createTable(String tableName, String...cfs) throws IOException {
        //1.判断是否存在列族信息
        if (cfs.length <= 0) {
            System.out.println("请设置列族信息");
            return;
        }
        //2.判断表是否存在
        if (isTableExist(tableName)) {
            System.out.println(tableName + "表已存在!");
            return;
        }
        //3.建立表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        //4.循环添加列族信息
        for (String cf : cfs) {
            //5.建立列族描述器
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
            //6.添加具体的列族信息
            hTableDescriptor.addFamily(hColumnDescriptor);
        }
        //7.建立表
        admin.createTable(hTableDescriptor);
    }

    //3.删除表
    public static void dropTable(String tableName) throws IOException {
        //1.判断表是否存在
        if (!isTableExist(tableName)) {
            System.out.println(tableName + "表不存在!");
            return;
        }
        TableName table_Name = TableName.valueOf(tableName);
        //2.使表下线
        admin.disableTable(table_Name);
        //3.删除表
        admin.deleteTable(table_Name);
    }

    //4.建立命名空间
    public static void createNameSpace(String ns) {
        //1.建立命名空间描述器
        NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(ns);
        NamespaceDescriptor namespaceDescriptor = builder.build();
        //2.建立命名空间
        try {
            admin.createNamespace(namespaceDescriptor);
        } catch (NamespaceExistException e) {
            System.out.println(ns + "命名空间已存在!");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("哈哈哈,命名空间尽管存在,我仍是能够走到这!!!");
    }

    //5.插入数据
    public static void putData(String tableName,String rowKey,String cf,String cn,String value) throws IOException {
        //1.获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        //2.建立put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        //3.给put对象赋值
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn),Bytes.toBytes(value));
        //4.插入数据
        table.put(put);
        //5.关闭表链接
        table.close();
    }

    //6.获取数据
    public static void getData(String tableName,String rowKey,String cf,String cn) throws IOException {
        //1.获取表对象
        Table table = connection.getTable(TableName.valueOf(tableName));
        //2.建立get对象
        Get get = new Get(Bytes.toBytes(rowKey));
        //2.1指定获取的列族
		//get.addFamily(Bytes.toBytes(cf));
        //2.2指定获取的列
        get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));
        //2.3设置获取数据的版本数
        get.setMaxVersions();

        //3.获取数据
        Result result = table.get(get);
        //4.解析result并打印
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            //5.打印数据
            System.out.println("CF:"+Bytes.toString(CellUtil.cloneFamily(cell))
            +",CN:"+Bytes.toString(CellUtil.cloneQualifier(cell))
            +",Value:"+Bytes.toString(CellUtil.cloneValue(cell)));
        }
        //6.关闭表链接
        table.close();
    }
}

2.API之拦截器查询

1)过滤器的类型不少,可是能够分为两大类——比较过滤器,专用过滤器
过滤器的做用是在服务端判断数据是否知足条件,而后只将知足条件的数据返回给客户端

2)HBase过滤器的比较运算符:

LESS  <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除全部

3)Hbase过滤器的专用过滤器(指定比较机制):

BinaryComparator  按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出如今value中

4)rowKey过滤器RowFilter

需求:经过RowFilter过滤比rowKey 0003小的全部值出来

/**
     * hbase行键过滤器RowFilter
     */
    @Test
    public  void rowKeyFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立RowKye过滤器
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("0003")));
		//scan对象添加过滤器
        scan.setFilter(rowFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

5)列族过滤器FamilyFilter

需求:查询比f2列族小的全部的列族内的数据

/**
     * hbase列族过滤器FamilyFilter
     */
    @Test
    public  void familyFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立列族过滤器
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.LESS, new SubstringComparator("f2"));
        //scan对象添加过滤器
        scan.setFilter(familyFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
         //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

6)列过滤器QualifierFilter

需求:只查询name列的值

/**
     * hbase列过滤器
     */
    @Test
    public  void qualifierFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立列过滤器
        QualifierFilter= new QualifierFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("name"));
        //scan对象添加过滤器
        scan.setFilter(qualifierFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
        //    System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

7)列值过滤器ValueFilter

需求:查询全部列当中包含8的数据

/**
     * hbase值过滤器
     * 查询包含8的列值
     */
    @Test
    public  void valueFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立列值过滤器
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("8"));
		//scan对象添加过滤器
        scan.setFilter(valueFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
            //    System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

8)单列值过滤器 SingleColumnValueFilter

需求:SingleColumnValueFilter会返回知足条件的整列值的全部字段

/**
     * 单列值过滤器,返回知足条件的整行数据
     */
    @Test
    public void singleColumnFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立单列值过滤器
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
        //scan对象添加过滤器
        scan.setFilter(singleColumnValueFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

9)列值排除过滤器SingleColumnValueExcludeFilter
与SingleColumnValueFilter相反,会排除掉指定的列,其余的列所有返回

10)rowkey前缀过滤器PrefixFilter

需求:查询以00开头的全部前缀的rowkey

/**
     * 行键前缀过滤器
     */
    @Test
    public void preFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立rowKey前缀过滤器
        PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
        //scan对象添加过滤器
        scan.setFilter(prefixFilter);
        //获取ResultScanner对象
        ResultScanner resultScanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : resultScanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

11)过滤器综合查询FilterList

需求:使用SingleColumnValueFilter查询f1列族,name为刘备的数据,而且同时知足rowkey的前缀以00开头的数据(PrefixFilter)

/**
     * 多过滤器组合使用
     */
    @Test
    public void manyFilter() throws IOException {
        //获取Configuration
        Configuration configuration = HBaseConfiguration.create();
        //设置Zookeeper集群
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //获取链接Connection
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表
        Table myuser = connection.getTable(TableName.valueOf("myuser"));
		//建立Scan对象
        Scan scan = new Scan();
        //建立过滤器集合
        FilterList filterList = new FilterList();
		//建立单列值过滤器
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
        //建立rowKey前缀过滤器
        PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
        //过滤器集合添加过滤器
        filterList.addFilter(singleColumnValueFilter);
        filterList.addFilter(prefixFilter);
        //scan设置过滤器集合
        scan.setFilter(filterList);
        //获取ResultScanner对象
        ResultScanner scanner = myuser.getScanner(scan);
        //遍历循环打印
        for (Result result : scanner) {
            //获取rowkey
            System.out.println(Bytes.toString(result.getRow()));
            //指定列族以及列打印列当中的数据出来
//            System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
            System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
            //System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
        }
        //关闭资源
        myuser.close();
        connection.close();
    }

10、数据处理

public class VideoDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(configuration);
        Job job = Job.getInstance(configuration,"Video3");
        //inputformat
        Path input = new Path("C:\\Users\\12419\\Desktop\\练习题\\video.txt");
        FileInputFormat.addInputPath(job,input);
        //outputformat
        Path output = new Path("C:\\Users\\12419\\Desktop\\练习题\\videoEnd");
        if(fs.exists(output))fs.delete(output,true);
        FileOutputFormat.setOutputPath(job,output);
        //mapper \  keyout \ valueout
        job.setMapperClass(VideoMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        //reducer \ keyout \ valueout
        //submit
        job.waitForCompletion(true);
    }

    public static class VideoMapper extends Mapper<LongWritable, Text, NullWritable,NullWritable> {

        static FileSystem fs;
        static FSDataOutputStream fsDataOutputStream;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            fs = FileSystem.get(context.getConfiguration());
            fsDataOutputStream = fs.create(new Path("C:\\Users\\12419\\Desktop\\练习题\\练习题3\\video"));
        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] datas = line.split(":");
            if(datas.length < 10){
                return;
            }
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < datas.length; i++) {
                if(i == 3 && datas[i].contains(" & ")){
                    sb.append(datas[i].replace(" & ", ",")).append("&");
                    continue;
                }
                if(i < 9){
                    sb.append(datas[i]).append("&");
                }else if(i == datas.length - 1){
                    sb.append(datas[i]+"\n");
                }else {
                    sb.append(datas[i]).append(",");
                }
            }
            fsDataOutputStream.write(sb.toString().getBytes());
        }
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            fsDataOutputStream.close();
        }
    }
}

//        @Override
//        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//            String line = value.toString();
//            String[] datas = line.split(":");
//            try {
//                if(datas[3].contains(" & ")){
//                    String str1 = line.substring(0,line.indexOf(datas[3]));
//                    String str2 = datas[3].replace(" & ", ",");
//                    String str3 = line.substring(str1.length() + datas[3].length());
//                    line = str1 + str2 + str3;
//                    datas = line.split(":");
//                }
//            }catch (ArrayIndexOutOfBoundsException e){
//                return;
//            }
//            if(datas.length < 10){
//                return;
//            }
//            int index = line.indexOf(datas[9]);
//            String str = line.substring(0,index) + line.substring(index).replace(":",",") + "\n";
//            str = str.replace(":", "&");
//            fsDataOutputStream.write(str.getBytes());
//        }

11、HBASE集成MapReduce

Driver:
	TableMapReduceUtil.initTableMapperJob
	TableMapReduceUtil.initTableReducerJob
	
Mapper:TableMapper<KEYOUT, VALUEOUT>
Reducer:TableReducer<KEYIN, VALUEIN, KEYOUT>

12、HBASE集成Hive

1.建立Hive内部表的同时建立Hbase表
create table hive_hbase_emp(
    id INT,
    username STRING,
    password STRING)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
with serdeproperties("hbase.columns.mapping" = ":key,info:username,info:password") 
tblproperties("hbase.table.name" = "hbase_emp");

2.建立Hive外部表与Hbase中存在的表作关联
create external table hive_external_hbase_emp(
    id INT,
    username STRING,
    password STRING)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'  
with serdeproperties("hbase.columns.mapping" = ":key,info:username,info:password") 
tblproperties("hbase.table.name" = "hbase_emp");