Hbase(七)hbase高级编程



一、Hbase结合mapreduce    

     为什么需要用 mapreduce 去访问 hbase 的数据?
     ——加快分析速度和扩展分析能力
     Mapreduce 访问 hbase 数据作分析一定是在离线分析的场景下应用

       

      1、HbaseToHDFS

         从 hbase 中读取数据,分析之后然后写入 hdfs,代码实现: 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package   com.ghgj.hbase.hbase2hdfsmr;
 
import   java.io.IOException;
import   java.util.List;
 
import   org.apache.hadoop.conf.Configuration;
import   org.apache.hadoop.fs.FileSystem;
import   org.apache.hadoop.fs.Path;
import   org.apache.hadoop.hbase.Cell;
import   org.apache.hadoop.hbase.HBaseConfiguration;
import   org.apache.hadoop.hbase.client.Result;
import   org.apache.hadoop.hbase.client.Scan;
import   org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import   org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import   org.apache.hadoop.hbase.mapreduce.TableMapper;
import   org.apache.hadoop.hbase.util.Bytes;
import   org.apache.hadoop.io.NullWritable;
import   org.apache.hadoop.io.Text;
import   org.apache.hadoop.mapreduce.Job;
import   org.apache.hadoop.mapreduce.Mapper;
import   org.apache.hadoop.mapreduce.Reducer;
import   org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
/**
  * 作用:从hbase中读取user_info这个表的数据,然后写出到hdfs
  */
public   class   HBaseToHDFSMR {
     
     private   static   final   String ZK_CONNECT =  "hadoop03:2181,hadoop04:2181,hadoop05:2181" ;
 
     public   static   void   main(String[] args)  throws   Exception {
         
         Configuration conf = HBaseConfiguration.create();
         conf.set( "hbase.zookeeper.quorum" , ZK_CONNECT);
         System.setProperty( "HADOOP_USER_NAME" ,  "hadoop" );
//      conf.set("fs.defaultFS", "hdfs://myha01/");
         
         Job job = Job.getInstance(conf);
         job.setJarByClass(HBaseToHDFSMR. class );
         
         Scan scan =  new   Scan();
         scan.addColumn(Bytes.toBytes( "base_info" ), Bytes.toBytes( "name" ));
         /**
          * TableMapReduceUtil:以util结尾:工具
          * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
          */
         TableMapReduceUtil.initTableMapperJob( "user_info" , scan,
                 HBaseToHDFSMRMapper. class , Text. class , NullWritable. class , job);
         job.setReducerClass(HBaseToHDFSMRReducer. class );
         
         job.setOutputKeyClass(Text. class );
         job.setOutputValueClass(NullWritable. class );
         
         Path outputPath =  new   Path( "/hbase2hdfs/output" );
         FileSystem fs = FileSystem.get(conf);
         if (fs.exists(outputPath)){
             fs.delete(outputPath);
         }
         FileOutputFormat.setOutputPath(job, outputPath);
         
         boolean   waitForCompletion = job.waitForCompletion( true );
         System.exit(waitForCompletion ?  0   :  1 );
     }
     
     static   class   HBaseToHDFSMRMapper  extends   TableMapper<Text, NullWritable>{
         /**
          * key:rowkey
          * value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
          * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
          */
         @Override
         protected   void   map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context)  throws   IOException, InterruptedException {
             String rowkey = Bytes.toString(key.copyBytes());
             System.out.println(rowkey);
             List<Cell> cells = value.listCells();
             for   ( int   i =  0 ; i < cells.size(); i++) {
                 Cell cell = cells.get(i);
                 String rowkey_result = Bytes.toString(cell.getRow()) +  "\t"
                         + Bytes.toString(cell.getFamily()) +  "\t"
                         + Bytes.toString(cell.getQualifier()) +  "\t"
                         + Bytes.toString(cell.getValue()) +  "\t"
                         + cell.getTimestamp();
                 context.write( new   Text(rowkey_result), NullWritable.get());
             }
         }
     }
     
     static   class   HBaseToHDFSMRReducer  extends   Reducer<Text, NullWritable, Text, NullWritable>{
         @Override
         protected   void   reduce(Text key, Iterable<NullWritable> arg1, Reducer<Text, NullWritable, Text, NullWritable>.Context context)  throws   IOException, InterruptedException {
             context.write(key, NullWritable.get());
         }
     }
}

  2、HDFSToHbase

        从 hdfs 从读入数据,处理之后写入 hbase,代码实现: 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package   com.ghgj.hbase.hbase2hdfsmr;
 
import   java.io.IOException;
 
import   org.apache.hadoop.conf.Configuration;
import   org.apache.hadoop.fs.Path;
import   org.apache.hadoop.hbase.HBaseConfiguration;
import   org.apache.hadoop.hbase.HColumnDescriptor;
import   org.apache.hadoop.hbase.HTableDescriptor;
import   org.apache.hadoop.hbase.TableName;
import   org.apache.hadoop.hbase.client.HBaseAdmin;
import   org.apache.hadoop.hbase.client.Mutation;
import   org.apache.hadoop.hbase.client.Put;
import   org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import   org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import   org.apache.hadoop.hbase.mapreduce.TableReducer;
import   org.apache.hadoop.hbase.util.Bytes;
import   org.apache.hadoop.io.LongWritable;
import   org.apache.hadoop.io.NullWritable;
import   org.apache.hadoop.io.Text;
import   org.apache.hadoop.mapreduce.Job;
import   org.apache.hadoop.mapreduce.Mapper;
import   org.apache.hadoop.mapreduce.Reducer;
import   org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
public   class   HDFSToHBaseMR {
     private   static   final   String ZK_CONNECT =  "hadoop03:2181,hadoop04:2181,hadoop05:2181" ;
     private   static   final   String TABLE_NAME =  "person_info" ;
 
     public   static   void   main(String[] args)  throws   Exception {
 
         Configuration conf = HBaseConfiguration.create();
         conf.set( "hbase.zookeeper.quorum" , ZK_CONNECT);
         System.setProperty( "HADOOP_USER_NAME" ,  "hadoop" );
         Job job = Job.getInstance(conf);
         job.setJarByClass(HDFSToHBaseMR. class );
 
         // 以下这一段代码是为了创建一张hbase表叫做 person_info
         HBaseAdmin admin =  new   HBaseAdmin(conf);
         HTableDescriptor htd =  new   HTableDescriptor(TableName.valueOf(TABLE_NAME));
         htd.addFamily( new   HColumnDescriptor( "base_info" ));
         if   (admin.tableExists(TABLE_NAME)) {
             admin.disableTable(TABLE_NAME);
             admin.deleteTable(TABLE_NAME);
         }
         admin.createTable(htd);
 
         // 给job指定mapperclass 和  reducerclass
         job.setMapperClass(HDFSToHBaseMRMapper. class );
         TableMapReduceUtil.initTableReducerJob(TABLE_NAME, HDFSToHBaseMRReducer. class , job);
         
         // 给mapper和reducer指定输出的key-value的类型
         job.setMapOutputKeyClass(Text. class );
         job.setMapOutputValueClass(NullWritable. class );
         job.setOutputKeyClass(ImmutableBytesWritable. class );
         job.setOutputValueClass(Mutation. class );
 
         // 指定输入数据的路径
         FileInputFormat.setInputPaths(job,  new   Path( "/hbase2hdfs/output" ));
         
         // job提交
         boolean   boo = job.waitForCompletion( true );
         System.exit(boo ?  0   : 1 );
     }
 
     static   class   HDFSToHBaseMRMapper  extends   Mapper<LongWritable, Text, Text, NullWritable> {
         @Override
         protected   void   map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)  throws   IOException, InterruptedException {
             context.write(value, NullWritable.get());
         }
     }
 
     /**
      * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
      */
     static   class   HDFSToHBaseMRReducer  extends   TableReducer<Text, NullWritable, ImmutableBytesWritable> {
 
         /**
          * baiyc_20150716_0001 base_info name baiyc1 1488348387443
          */
         @Override
         protected   void   reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context)  throws   IOException, InterruptedException {
 
             String[] splits = key.toString().split( "\t" );
             String rowkeyStr = splits[ 0 ];
             ImmutableBytesWritable rowkey =  new   ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
 
             Put put =  new   Put(Bytes.toBytes(rowkeyStr));
 
             String family = splits[ 1 ];
             String qualifier = splits[ 2 ];
             String value = splits[ 3 ];
             String ts = splits[ 4 ];
 
             put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
 
             context.write(rowkey, put);
         }
     }
 
}

二、Hbase和mysql数据库数据进行互导

      1、mysql数据导入到hbase(用sqoop)

  命令:

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-create-table --hbase-table studenttest --column-family name
--hbase-row-key id

 

其 中 会 报 错 , 说 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 是由于版本不兼容引起,我们可以通过事先创建好表就可以使用了。
请使用下面的命令: 

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-table studenttest1 --column-family name --hbase-row-key id

 

--hbase-create-table 自动在 hbase 中创建表
--column-family name 指定列簇名字
--hbase-row-key id 指定 rowkey 对应的 mysql 当中的键

    2、hbase数据导入到mysql

目前没有直接的命令将 Hbase 中的数据导出到 mysql,但是可以先将 hbase 中的数据导 出到 hdfs 中,再将数据导出 mysql

替代方案:
先将 hbase 的数据导入到 hdfs 或者 hive,然后再将数据导入到 mysql

三、hbase整合hive

     原理:

Hive 与 HBase 利用两者本身对外的 API 来实现整合,主要是靠 HBaseStorageHandler 进 行通信,利用 HBaseStorageHandler, Hive 可以获取到 Hive 表对应的 HBase 表名,列簇以及 列, InputFormat 和 OutputFormat 类,创建和删除 HBase 表等。

Hive 访问 HBase 中表数据,实质上是通过 MapReduce 读取 HBase 表数据,其实现是在 MR 中,使用 HiveHBaseTableInputFormat 完成对 HBase 表的切分,获取 RecordReader 对象来读 取数据。

对 HBase 表的切分原则是一个 Region 切分成一个 Split,即表中有多少个 Regions,MR 中就有多 少个 Map。

读取 HBase 表数据都是通过构建 Scanner,对表进行全表扫描,如果有过滤条件,则转化为 Filter。当过滤条件为 rowkey 时,则转化为对 rowkey 的过滤, Scanner 通过 RPC 调用  RegionServer 的 next()来获取数据;

 1、准备hbase表 数据

create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

插入数据:

put 'mingxing','rk001','base_info:name','huangbo'
put 'mingxing','rk001','base_info:age','33'
put 'mingxing','rk001','extra_info:math','44'
put 'mingxing','rk001','extra_info:province','beijing'
put 'mingxing','rk002','base_info:name','xuzheng'
put 'mingxing','rk002','base_info:age','44'
put 'mingxing','rk003','base_info:name','wangbaoqiang'
put 'mingxing','rk003','base_info:age','55'
put 'mingxing','rk003','base_info:gender','male'
put 'mingxing','rk004','extra_info:math','33'
put 'mingxing','rk004','extra_info:province','tianjin'
put 'mingxing','rk004','extra_info:children','3'
put 'mingxing','rk005','base_info:name','liutao'
put 'mingxing','rk006','extra_info:name','liujialing'

   2、hive端操作

 

三、hbasetohbase   byMR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.ghgj.hbase.hbase2hdfsmr;
 
import java.io.IOException;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
 
public   class   HBaseToHBaseByMR {
 
     private   static   final String ZK_CONNECT =  "hadoop03:2181,hadoop04:2181,hadoop05:2181" ;
     private   static   final String OLD_TABLE_NAME =  "user_info" ;
     private   static   final String NEW_TABLE_NAME =  "person_info2" ;
     private   static   final String FAMILY =  "base_info" ;
     private   static   final String QUALIFIER =  "age" ;
 
     public   static   void   main(String[] args) throws Exception {
 
         Configuration conf = HBaseConfiguration.create();
         conf. set ( "hbase.zookeeper.quorum" , ZK_CONNECT);
         System.setProperty( "HADOOP_USER_NAME" ,  "hadoop" );
         // conf.set("fs.defaultFS", "hdfs://myha01/");
 
         Job job = Job.getInstance(conf);
         job.setJarByClass(HBaseToHDFSMR. class );
 
         // 以下这一段代码是为了创建一张hbase表叫做 person_info
         HBaseAdmin admin =  new   HBaseAdmin(conf);
         HTableDescriptor htd =  new   HTableDescriptor(TableName.valueOf(NEW_TABLE_NAME));
         htd.addFamily( new   HColumnDescriptor(FAMILY));
         if   (admin.tableExists(NEW_TABLE_NAME)) {
             admin.disableTable(NEW_TABLE_NAME);
             admin.deleteTable(NEW_TABLE_NAME);
         }
         admin.createTable(htd);
 
         Scan scan =  new   Scan();
         scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER));
         /**
          * TableMapReduceUtil:以util结尾:工具
          * MapReduceFactory:以factory结尾,它是工厂类,最大作用就是管理对象的生成
          */
         TableMapReduceUtil.initTableMapperJob(OLD_TABLE_NAME, scan, HBaseToHBaseByMRMapper. class , Text. class , NullWritable. class , job);
         TableMapReduceUtil.initTableReducerJob(NEW_TABLE_NAME, HBaseToHBaseByMRReducer. class , job);
 
         // 给mapper和reducer指定输出的key-value的类型
         job.setMapOutputKeyClass(Text. class );
         job.setMapOutputValueClass(NullWritable. class );
         job.setOutputKeyClass(ImmutableBytesWritable. class );
         job.setOutputValueClass(Mutation. class );
 
         boolean waitForCompletion = job.waitForCompletion( true );
         System.exit(waitForCompletion ? 0 : 1);
     }
 
     static   class   HBaseToHBaseByMRMapper extends TableMapper<Text, NullWritable> {
         /**
          * key:rowkey value:map方法每执行一次接收到的一个参数,这个参数就是一个Result实例
          * 这个Result里面存的东西就是rowkey, family, qualifier, value, timestamp
          */
         @Override
         protected   void   map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
             String rowkey = Bytes.toString(key.copyBytes());
             System. out .println(rowkey);
             List<Cell> cells = value.listCells();
             for   ( int   i = 0; i < cells.size(); i++) {
                 Cell cell = cells. get (i);
                 String rowkey_result = Bytes.toString(cell.getRow()) +  "\t"   + Bytes.toString(cell.getFamily()) +  "\t"   + Bytes.toString(cell.getQualifier()) +  "\t"   + Bytes.toString(cell.getValue()) +  "\t"   + cell.getTimestamp();
                 context.write( new   Text(rowkey_result), NullWritable. get ());
             }
         }
     }
 
     /**
      * TableReducer extends Reducer 这么做的唯一效果就是把valueout的类型确定为Mutation
      */
     static   class   HBaseToHBaseByMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
 
         /**
          * baiyc_20150716_0001 base_info name baiyc1 1488348387443
          */
         @Override
         protected   void   reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
 
             String[] splits = key.toString().split( "\t" );
             String rowkeyStr = splits[0];
             ImmutableBytesWritable rowkey =  new   ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));
 
             Put put =  new   Put(Bytes.toBytes(rowkeyStr));
 
             String family = splits[1];
             String qualifier = splits[2];
             String value = splits[3];
             String ts = splits[4];
 
             put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));
 
             context.write(rowkey, put);
         }
     }
}