HBaseMapReduce3:
将HDFS文件内容数据写入存储到HBase中:
对一些大的文件,须要存入HBase中,其思想是先把文件传到HDFS上,利用map阶段读取<key,value>对,可在reduce把这些键值对上传到HBase中。java
这里已是固定指定HDFS中的某一文件,而后在reduce中把这些键值对写入到HBase中。
数据库
public class HBaseAndMapReduce3 { public static void main(String[] args) throws Exception { System.exit(run()); } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.226.129"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HBaseAndMapReduce3.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" ); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/2016051818564427/part-r-00000")); // 把数据写入Hbase数据库 TableMapReduceUtil.initTableReducerJob("friend",FindFriendReducer.class, job); checkTable(conf); return job.waitForCompletion(true) ? 0 : 1; } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("friend"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("person"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新建立表成功...."); } } public static class FindFriendReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce( Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Put put = new Put(key.getBytes()); put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"), values.iterator().next().getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } }
//原数据文件中的内容:ide
hadoop Berg-OSChina,BergBerg hbase OSChina,BergBerg zookeeper OSChina,BergBerg
///将HDFS中文件内容存入HBase中,经过客户端全表扫描知:oop
hbase(main):003:0> scan 'friend' ROW COLUMN+CELL hadoop column=person:nickname, timestamp=1463748372584, value=Berg-OSChina,BergBerg hbasep column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg zookeeper column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg 3 row(s) in 0.2850 seconds