1.1 什么是HBasenode
1)HBASE译为“Hadoop Database”,是一个高可靠性、高性能、列存储、可伸缩、实时读写的NoSQL数据库系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群
2)主要用来存储结构化和半结构化的松散数据
3)HBASE是Google Bigtable的开源实现,可是也有不少不一样之处。好比:Google Bigtable使用GFS做为其文件存储系统,HBASE利用Hadoop HDFS做为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE一样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby做为协同服务,HBASE利用Zookeeper做为协同服务
4)HBASE的目标是存储并处理大型的数据,更具体来讲是仅需使用普通的硬件配置,就可以处理由成千上万的行和列所组成的大型数据web
1.2 与传统数据库的对比shell
1)传统数据库遇到的问题:数据库
2)HBASE优点:apache
1.3 HBase集群中的角色vim
1)一个或者多个主节点 : HMaster
2)多个从节点 : HRegionServer
3)HBase依赖项 : Zookeeper,HDFS数组
2.1 ROW KEY
1)访问hbase table中的行,只有三种方式:缓存
2)Hbase会对表中的数据按照Row key排序(字典顺序)
3)Row key只能存储64k的字节数据(实际应用中长度通常为 10-100bytes),在Hbase内部,Row key保存形式为字节数组安全
2.2 Column Family列族 & Column列
1)HBase表中的每一个列都归属于某个列族,列族必须做为表模式(schema)定义的一部分预先给出。如 create ‘表名’, ‘列族名’
2)列名以列族做为前缀,每一个“列族”均可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)能够随后按需、动态的加入
3)权限控制、存储以及调优都是在列族层面进行的
4)HBase把同一列族里面的数据存储在同一目录下,由几个文件保存bash
2.3 Timestamp时间戳
1)在HBase每一个cell存储单元对同一份数据有多个版本,根据惟一的时间戳来区分每一个版本之间的差别,不一样版本的数据按照时间倒序排序,最新的数据版本排在最前面
2)时间戳的类型是 64位整型
3)时间戳能够由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间
4)时间戳也能够由客户显式赋值,若是应用程序要避免数据版本冲突,就必须本身生成具备惟一性的时间戳
5)为了不数据存在过多版本形成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式:
6)用户能够针对每一个列族进行设置
2.4 Cell单元格
1)由 RowKey 和 "ColumnFamily:Column"的坐标交叉决定
2)单元格是有版本的
3)单元格的内容是未解析的字节数组, 由 {row key, column( =<family> +<qualifier>), version}
惟一肯定的单元;cell中的数据是没有类型的,所有是字节码形式存贮
2.5 VersionNum
1)数据的版本号,每条数据能够有多个版本号,默认值为系统时间戳,类型为Long
2.6 Region
1)HBase自动把表水平划分红多个区域(region),每一个region会保存一个表里面某段连续的数据;每一个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,region就会等分会两个新的region(裂变)
2)当table中的行不断增多,就会有愈来愈多的region。这样一张完整的表被切分红多个Region,保存在多个RegionServer上
3)Region是负载均衡的最小单元,最小单元就表示不一样的Region能够分布在不一样的HRegion server上。但一个Region是不会拆分到多个RegionServer上的
4)Region虽然是负载均衡的最小单元,但并非物理存储的最小单元;
事实上,Region由一个或者多个Store组成,每一个Store保存一个column family;
每一个Strore又由一个MemStore和0至多个StoreFile组成
3.1 Client
1)Client包含了访问Hbase的接口,另外Client还维护了对应的cache来加速Hbase的访问,好比cache的.META.
元数据的信息
3.2 Zookeeper
1)保证任什么时候候,集群中只有一个Master,若是Master异常,会经过竞争机制产生新的Master提供服务
2)存贮全部Region的寻址入口
3)实时监控RegionServer的状态,将RegionServer的上线和下线信息实时通知给Master
4)存储HBase的schema和table元数据信息
3.3 Master
1)为RegionServer分配Region
2)负责RegionServer的负载均衡
3)发现失效的RegionServer并从新分配其上的Region
4)处理对schema更新请求
3.4 RegionServer
1)RegionServer维护Master分配给它的Region,处理对Region的IO请求
2)RegionServer负责切分在运行过程当中变得过大的Region;能够看到,Client访问Hbase上数据的过程并不须要Master参与(寻址访问Zookeeper和RegionServer,数据读写访问RegioneServer),Master仅仅维护者table和Region的元数据信息,负载很低
3)负责和底层HDFS的交互,存储数据到HDFS
4)负责Storefile的合并工做
5.1 写数据
1)Client 访问 Zookeeper,获取 hbase:meta表位于哪一个 RegionServer 2)访问对应的RegionServer,获取hbase:meta表,根据rowkey肯定当前将要写入的数据所对应的RegionServer 服务器和Region,并将该table的Region信息以及meta表的位置信息缓存在客户端的 Meta cache,方便下次访问 3)Client与目标RegionServer进行通信,发起写入数据请求 4)Client将数据写入(追加)到HLog(WAL,Write ahead log,预写日志),以防止数据丢失 5)而后将数据写入对应的MemStore,数据会在MemStore内进行排序 6)若是HLog和Memstore都写入成功,则这条数据写入成功,向客户端发送消息,告知写入成功,若是其中一个写入失败,就表示此次写入失败 7)等达到MemStore的阈值后,将数据flush到磁盘成为StoreFile,当StoreFile达到阈值(默认3个)后进行compact操做,将多个StoreFile合并成一个大StoreFile
1)此过程没有Master的参与
2)客户端在写数据时会先去查看本地的meta cache,若是有以前的缓存数据,直接经过缓存元数据访问 RegionServer,这时不经过Zookeeper获取meta表去查找元数据,。若是集群发生某些变化致使hbase:meta元数据更改,客户端再根据本地元数据表请求的时候就会发生异常,此时客户端须要从新加载一份最新的元数据表到本地
3)MemStore触发时机
5.2 读数据
1)Client 访问 Zookeeper,获取 hbase:meta表位于哪一个 RegionServer 2)访问对应的RegionServer,获取hbase:meta表,根据rowkey肯定当前将要读取的数据位于哪一个RegionServer 中的哪一个Region中。并将该table的region信息以及meta表的位置缓存在客户端 meta cache,方便下次访问 3)Client向该RegionServer服务器发起读取数据请求,而后RegionServer收到请求并响应 4)分别在BlockCache(读缓存)、MemStore和StoreFile(BlockCache中有的数据 在StoreFile中就再也不读取)中查询目标数据,并将查到的全部数据进行合并; 此处的全部数据指的是同一条数据的不一样版本(Time Stamp)或者不一样的类型(Put/Delete) 5)将从文件中查询到的数据块(Block,HFile数据存储单元,默认大小为64KB)缓存到Block Cache 6)将合并后的最终结果返回给客户端
1)此过程没有Master的参与
2)客户端在读数据时会先去查看本地的meta cache,若是有以前的缓存数据,直接经过缓存元数据访问 RegionServer,这时不经过Zookeeper获取meta表去查找元数据,。若是集群发生某些变化致使hbase:meta元数据更改,客户端再根据本地元数据表请求的时候就会发生异常,此时客户端须要从新加载一份最新的元数据表到本地
前提:Hadoop集群要启动正常,Zookeeper集群启动正常
1.解压tar包
tar -zxf hbase-1.2.0-cdh5.14.0.tar.gz ../servers/hbase
2.配置环境变量(根据本身环境配置)
vim /etc/profile.d/hbase.sh export HBASE_HOME=/export/servers/hbase export PATH=$PATH:$HBASE_HOME/bin source /etc/profile
3.修改配置文件
cd $HBASE_HOME/conf
27行左右配置JAVA_HOME
4六、47行左右注释掉,JDK1.7须要使用,咱们使用的JDK1.8,因此不须要,注释掉,否则启动Hbase会有警告,看着‘烦’
128行左右取消注释,修改true为false,使用咱们本身Zookeeper
根据本身环境添加以下代码↓↓↓
<configuration> <!-- Hbase在HDFS上的根目录 --> <property> <name>hbase.rootdir</name> <value>hdfs://node01:8020/hbase</value> </property> <!-- Hbase集群模式,false表示hbase单机模式,true表示是分布式模式,默认false--> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 0.98后的新变更,以前版本没有.port,默认端口为60000 --> <!-- hbase master节点端口 --> <property> <name>hbase.master.port</name> <value>16000</value> </property> <!-- 添加zookeeper节点 --> <property> <name>hbase.zookeeper.quorum</name> <value>node01:2181,node02:2181,node03:2181</value> </property> <!-- 配置hbase的数据在zookeeper上存储的目录,也就是Zookeeper的myid所在的目录 --> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/export/servers/zookeeper/zkdata</value> </property> </configuration>
添加RegionServer节点主机名
添加Master备份节点主机名
由于Hbase基于HDFS,因此须要读取Hadoop配置
ln -s $HADOOP_HOME/etc/hadoop/core-site.xml $HBASE_HOME/conf/core-site.xml ln -s $HADOOP_HOME/etc/hadoop/hdfs-site.xml $HBASE_HOME/conf/hdfs-site.xml
scp -r hbase node02:`pwd` scp -r hbase node03:`pwd` scp hbase.sh node02:`pwd` scp hbase.sh node03:`pwd` 注意: 分发完记得 source /etc/profile
在主节点启动时记得免密钥哦! 启动hbase集群: start-hbase.sh 查看 jps 若是三台节点都有 HMaster 和 HRegionServer 表示启动成功 也能够经过访问WEB页面 主机名:60010 来查看 或使用 hbase shell命令进入 hbase shell窗口 使用 list 命令查看
hbase shell
help
count 'table'
status '主机名'
whoami
exists 'table'
is_enabled 'table' is_disabled 'table'
enable 'table' disable 'table'
DDL:
增 1.建立表 create 'table','columnFamily'...'columnFamily'... 2.建立命名空间 create_namespace '命名空间名' 2.建立表并指定版本数 create 'table',{NAME => 'columnFamily',VERSIONS => num}... 删 1.删除表(先禁用,再删除) (1).disable 'table' (2).drop 'table' 2.删除命名空间(先禁用命名空间下全部表,再删除全部表,最后删除命名空间) (1).disable_all '命名空间名.*' (2).drop_all '命名空间名.*' (3).drop_namespace '命名空间名' 改 1.添加列族/修改列族版本数 默认VERSION版本 alter 'table','columnFamily'... 指定VERSION版本 alter 'table',{NAME => 'columnFamily',VERSIONS => num}... 查 1.查看全部表 list 2.查看表结构信息 desc(describe) 'table' 3.查看全部命名空间 list_namespace
DML:
增/改 1.表添加/修改数据 put 'table','rowkey','columnFamily:column','value',(timestamp) 删 1.表删除列 delete 'table','rowkey','columnFamily:column',(timestamp) 2.表删除列族 (1).delete 'table','rowkey','columnFamily' (2).alter 'table', NAME => 'columnFamily', METHOD => 'delete' alter 'table', 'delete' => 'columnFamily' 3.表删除rowkey (1).delete 'table','rowkey' (2).deleteall 'table','rowkey' 4.清空表数据 truncate 'table' 查 1.查询表的全部数据 scan 'table' 2.根据 列族/列名 查询表的数据 scan 'table',{COLUMN => 'columnFamily:column'} scan 'table', {COLUMNS => ['columnFamily:column'...],VERSIONS => num} 3.根据 rowkey 查询指定范围(左闭右开) scan 'table',{STARTROW => 'rowkey',ENDROW => 'rowkey'} 4.查询版本号为num之内的表的全部数据 scan 'table',{RAW => true , VERSIONS => num} scan 'table',{COLUMNs => 'columnFamily',RAW => true , VERSIONS => num} 5.scan模糊查询 scan 'table',{FILTER=>"PrefixFilter('startStr')"} scan 'table',{TIMERANGE => [时间戳1, 时间戳2]} 6.get查询 (1).get 'table','rowkey' (2).get 'table','rowkey','columnFamily'... (3).get 'table','rowkey','columnFamily:column'... (4).get 'table','rowkey',{COLUMN => 'columnFamily:column',VERSIONS => num} (5).get 'table','rowkey',{FILTER => "ValueFilter(=, 'binary:str')"}
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.14.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <!--将咱们其余用到的一些jar包所有都打包进来 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build>
public class demo01 { private static Connection connection = null; private static Admin admin = null; //初始化配置 static { try { //1.获取配置文件信息 Configuration configuration = HBaseConfiguration.create(); //默认2181,可写 configuration.set("hbase.zookeeper.quorum", "node01,node02,node03"); //2.获取管理员对象 connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } } //关闭资源 public static void close() { if (admin != null) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { //1.测试表是否存在 System.out.println(isTableExist("ccc")); //2.建立表 createTable("ccc","info1","info2"); //3.删除表 dropTable("ccc"); //4.建立命名空间 createNameSpace("test"); //5.插入数据 putData("ccc","1001","info1","name","zhangsan"); //6.获取数据 getData("ccc","1000","info1","name"); //7. //8. //关闭资源 close(); } //1.判断表是否存在 public static boolean isTableExist(String tableName) throws IOException { //1.判断表是否存在 boolean exists = admin.tableExists(TableName.valueOf(tableName)); return exists; } //2.建立表 public static void createTable(String tableName, String...cfs) throws IOException { //1.判断是否存在列族信息 if (cfs.length <= 0) { System.out.println("请设置列族信息"); return; } //2.判断表是否存在 if (isTableExist(tableName)) { System.out.println(tableName + "表已存在!"); return; } //3.建立表描述器 HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); //4.循环添加列族信息 for (String cf : cfs) { //5.建立列族描述器 HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf); //6.添加具体的列族信息 hTableDescriptor.addFamily(hColumnDescriptor); } //7.建立表 admin.createTable(hTableDescriptor); } //3.删除表 public static void dropTable(String tableName) throws IOException { //1.判断表是否存在 if (!isTableExist(tableName)) { System.out.println(tableName + "表不存在!"); return; } TableName table_Name = TableName.valueOf(tableName); //2.使表下线 admin.disableTable(table_Name); //3.删除表 admin.deleteTable(table_Name); } //4.建立命名空间 public static void createNameSpace(String ns) { //1.建立命名空间描述器 NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(ns); NamespaceDescriptor namespaceDescriptor = builder.build(); //2.建立命名空间 try { admin.createNamespace(namespaceDescriptor); } catch (NamespaceExistException e) { System.out.println(ns + "命名空间已存在!"); } catch (IOException e) { e.printStackTrace(); } System.out.println("哈哈哈,命名空间尽管存在,我仍是能够走到这!!!"); } //5.插入数据 public static void putData(String tableName,String rowKey,String cf,String cn,String value) throws IOException { //1.获取表对象 Table table = connection.getTable(TableName.valueOf(tableName)); //2.建立put对象 Put put = new Put(Bytes.toBytes(rowKey)); //3.给put对象赋值 put.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn),Bytes.toBytes(value)); //4.插入数据 table.put(put); //5.关闭表链接 table.close(); } //6.获取数据 public static void getData(String tableName,String rowKey,String cf,String cn) throws IOException { //1.获取表对象 Table table = connection.getTable(TableName.valueOf(tableName)); //2.建立get对象 Get get = new Get(Bytes.toBytes(rowKey)); //2.1指定获取的列族 //get.addFamily(Bytes.toBytes(cf)); //2.2指定获取的列 get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn)); //2.3设置获取数据的版本数 get.setMaxVersions(); //3.获取数据 Result result = table.get(get); //4.解析result并打印 Cell[] cells = result.rawCells(); for (Cell cell : cells) { //5.打印数据 System.out.println("CF:"+Bytes.toString(CellUtil.cloneFamily(cell)) +",CN:"+Bytes.toString(CellUtil.cloneQualifier(cell)) +",Value:"+Bytes.toString(CellUtil.cloneValue(cell))); } //6.关闭表链接 table.close(); } }
1)过滤器的类型不少,可是能够分为两大类——比较过滤器,专用过滤器
过滤器的做用是在服务端判断数据是否知足条件,而后只将知足条件的数据返回给客户端
2)HBase过滤器的比较运算符:
LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除全部
3)Hbase过滤器的专用过滤器(指定比较机制):
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[]) BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同 NullComparator 判断给定的是否为空 BitComparator 按位比较 RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL SubstringComparator 判断提供的子串是否出如今value中
4)rowKey过滤器RowFilter
需求:经过RowFilter过滤比rowKey 0003小的全部值出来
/** * hbase行键过滤器RowFilter */ @Test public void rowKeyFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立RowKye过滤器 RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("0003"))); //scan对象添加过滤器 scan.setFilter(rowFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
5)列族过滤器FamilyFilter
需求:查询比f2列族小的全部的列族内的数据
/** * hbase列族过滤器FamilyFilter */ @Test public void familyFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立列族过滤器 FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.LESS, new SubstringComparator("f2")); //scan对象添加过滤器 scan.setFilter(familyFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
6)列过滤器QualifierFilter
需求:只查询name列的值
/** * hbase列过滤器 */ @Test public void qualifierFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立列过滤器 QualifierFilter= new QualifierFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("name")); //scan对象添加过滤器 scan.setFilter(qualifierFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 // System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
7)列值过滤器ValueFilter
需求:查询全部列当中包含8的数据
/** * hbase值过滤器 * 查询包含8的列值 */ @Test public void valueFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立列值过滤器 ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("8")); //scan对象添加过滤器 scan.setFilter(valueFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 // System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
8)单列值过滤器 SingleColumnValueFilter
需求:SingleColumnValueFilter会返回知足条件的整列值的全部字段
/** * 单列值过滤器,返回知足条件的整行数据 */ @Test public void singleColumnFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立单列值过滤器 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes()); //scan对象添加过滤器 scan.setFilter(singleColumnValueFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
9)列值排除过滤器SingleColumnValueExcludeFilter
与SingleColumnValueFilter相反,会排除掉指定的列,其余的列所有返回
10)rowkey前缀过滤器PrefixFilter
需求:查询以00开头的全部前缀的rowkey
/** * 行键前缀过滤器 */ @Test public void preFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立rowKey前缀过滤器 PrefixFilter prefixFilter = new PrefixFilter("00".getBytes()); //scan对象添加过滤器 scan.setFilter(prefixFilter); //获取ResultScanner对象 ResultScanner resultScanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : resultScanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
11)过滤器综合查询FilterList
需求:使用SingleColumnValueFilter查询f1列族,name为刘备的数据,而且同时知足rowkey的前缀以00开头的数据(PrefixFilter)
/** * 多过滤器组合使用 */ @Test public void manyFilter() throws IOException { //获取Configuration Configuration configuration = HBaseConfiguration.create(); //设置Zookeeper集群 configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); //获取链接Connection Connection connection = ConnectionFactory.createConnection(configuration); //获取表 Table myuser = connection.getTable(TableName.valueOf("myuser")); //建立Scan对象 Scan scan = new Scan(); //建立过滤器集合 FilterList filterList = new FilterList(); //建立单列值过滤器 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes()); //建立rowKey前缀过滤器 PrefixFilter prefixFilter = new PrefixFilter("00".getBytes()); //过滤器集合添加过滤器 filterList.addFilter(singleColumnValueFilter); filterList.addFilter(prefixFilter); //scan设置过滤器集合 scan.setFilter(filterList); //获取ResultScanner对象 ResultScanner scanner = myuser.getScanner(scan); //遍历循环打印 for (Result result : scanner) { //获取rowkey System.out.println(Bytes.toString(result.getRow())); //指定列族以及列打印列当中的数据出来 // System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes()))); System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes()))); //System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes()))); } //关闭资源 myuser.close(); connection.close(); }
public class VideoDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(configuration); Job job = Job.getInstance(configuration,"Video3"); //inputformat Path input = new Path("C:\\Users\\12419\\Desktop\\练习题\\video.txt"); FileInputFormat.addInputPath(job,input); //outputformat Path output = new Path("C:\\Users\\12419\\Desktop\\练习题\\videoEnd"); if(fs.exists(output))fs.delete(output,true); FileOutputFormat.setOutputPath(job,output); //mapper \ keyout \ valueout job.setMapperClass(VideoMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); //reducer \ keyout \ valueout //submit job.waitForCompletion(true); } public static class VideoMapper extends Mapper<LongWritable, Text, NullWritable,NullWritable> { static FileSystem fs; static FSDataOutputStream fsDataOutputStream; @Override protected void setup(Context context) throws IOException, InterruptedException { fs = FileSystem.get(context.getConfiguration()); fsDataOutputStream = fs.create(new Path("C:\\Users\\12419\\Desktop\\练习题\\练习题3\\video")); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] datas = line.split(":"); if(datas.length < 10){ return; } StringBuilder sb = new StringBuilder(); for (int i = 0; i < datas.length; i++) { if(i == 3 && datas[i].contains(" & ")){ sb.append(datas[i].replace(" & ", ",")).append("&"); continue; } if(i < 9){ sb.append(datas[i]).append("&"); }else if(i == datas.length - 1){ sb.append(datas[i]+"\n"); }else { sb.append(datas[i]).append(","); } } fsDataOutputStream.write(sb.toString().getBytes()); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { fsDataOutputStream.close(); } } } // @Override // protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // String line = value.toString(); // String[] datas = line.split(":"); // try { // if(datas[3].contains(" & ")){ // String str1 = line.substring(0,line.indexOf(datas[3])); // String str2 = datas[3].replace(" & ", ","); // String str3 = line.substring(str1.length() + datas[3].length()); // line = str1 + str2 + str3; // datas = line.split(":"); // } // }catch (ArrayIndexOutOfBoundsException e){ // return; // } // if(datas.length < 10){ // return; // } // int index = line.indexOf(datas[9]); // String str = line.substring(0,index) + line.substring(index).replace(":",",") + "\n"; // str = str.replace(":", "&"); // fsDataOutputStream.write(str.getBytes()); // }
Driver: TableMapReduceUtil.initTableMapperJob TableMapReduceUtil.initTableReducerJob Mapper:TableMapper<KEYOUT, VALUEOUT> Reducer:TableReducer<KEYIN, VALUEIN, KEYOUT>
1.建立Hive内部表的同时建立Hbase表 create table hive_hbase_emp( id INT, username STRING, password STRING) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties("hbase.columns.mapping" = ":key,info:username,info:password") tblproperties("hbase.table.name" = "hbase_emp"); 2.建立Hive外部表与Hbase中存在的表作关联 create external table hive_external_hbase_emp( id INT, username STRING, password STRING) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties("hbase.columns.mapping" = ":key,info:username,info:password") tblproperties("hbase.table.name" = "hbase_emp");