即HBase做为MapReduce的数据来源,MapReduce 分析,输出数据存储在HBase表中html
HBase, MapReduce, and the CLASSPATH
By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
官网bb了不少,意思是说,mapReduce 默认是没有添加HBase的依赖包的,你能够经过添加HBase-site这个配置文件到hadoop配置目录下,可是这样要复制到整个集群;或者你能够编辑Hadoop的CLASSPATH,但这样又会使得你的Hadoop环境受到污染。并且须要重启Hadoop集群才能生效。
所以,最好的方法是让HBase本身添加本身的依赖包到Hadoop的CLASSPATH,而后再使用程序。java
bin/hbase mapredcp
#先将HBase的依赖包告诉世界 (空格) 而后执行mapreduce程序 $ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar
# 统计Cell数目 CellCounter: Count cells in HBase table. # WALPlayer: Replay WAL files. # ******大量的数据加载******重中之重,把TSV、CSV格式的文件经过 MapReduce 直接存储成 hfile(以块存储的HBase文件) 而后加载(移动)到表中去,不走正常的路径一条条插入 completebulkload: Complete a bulk data load. # 从一个集群拷贝到另外一个集群 copytable: Export a table from local cluster to peer cluster. # 导入导出数据从HBase > HDFS export: Write table data to HDFS. exportsnapshot: Export the specific snapshot to a given FileSystem. import: Import data written by Export. # TSV table分隔 CSV 使用逗号分隔 importtsv: Import data in TSV format. # 统计行数 rowcounter: Count rows in HBase table. verifyrep: Compare the data from tables in two different clusters.
package com.gci.hadoop.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 需求分析,从数据表user读取info:name到新表basic:info:name */ // extends Configured implements Tool 实现Tool接口的run方法,真正的入口的方法 public class Table_user2basic extends Configured implements Tool { public static final String sourceTable = "user"; public static final String targetTable = "basic"; // 一.Mapper class extends TableMapper<KEYOUT输出的Key的类型, VALUEOUT输出的Value的类型> // 原版的Mapper程序是有输入的KV类型,和输出的KV类型四个参数,源码:extends Mapper<ImmutableBytesWritable, // Result, KEYOUT, VALUEOUT> // Put类型为hbase中定义的类型,便于做为Reducer的输入类型,根据reducer输入类型可知 public static class ReadUserMapper extends TableMapper<Text, Put> { private Text mapOutputKey = new Text(); @Override public void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context) throws IOException, InterruptedException { // get rowKey String rowKey = Bytes.toString(key.get()); // set outputRowKey mapOutputKey.set(rowKey); // 经过rowKey建立put对象 Put put = new Put(key.get()); // 迭代以获取cell数据 for (Cell cell : value.rawCells()) { // add family 详情请看HBase API 使用(让info在前,避免了空指针异常) if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { // add column:name if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } } } } } // 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT> // 输出key 类型为ImmutableBytesWritable 实现writeableComparable的字节数组 // 输出 value 类型为 Mutation 是 delete put increment append 的父类 public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<Put> values, Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { // 从获得的put中获得数据 for (Put put : values) { // 往外写数据 context.write(null, put); } } } // 三.Driver public int run(String[] arg0) throws Exception { // create job Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); // set run job class job.setJarByClass(this.getClass()); // set job Scan scan = new Scan(); scan.setCaching(500); // 每次获取条目数 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs // set other scan attrs // set input and set mapper TableMapReduceUtil.initTableMapperJob(sourceTable, // input table scan, // Scan instance to control CF and attribute selection ReadUserMapper.class, // mapper class Text.class, // mapper output key Put.class, // mapper output value job); // set reducer and output TableMapReduceUtil.initTableReducerJob(targetTable, // output table WriteBasicReducer.class, // reducer class job); job.setNumReduceTasks(1); // 设置Reduce个数 at least one, adjust as required // 提交 submit job Boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { // get configuration Configuration configuration = HBaseConfiguration.create(); // submit job 提交job int status = ToolRunner.run(configuration, new Table_user2basic(), args); // exit program 结束程序 System.exit(status); } }