实战:在Java Web 项目中使用HBase

在此以前咱们使用Mysql做为数据源,但发现这数据增加速度太快,而且因为种种缘由,所以必须使用HBase,因此咱们要把Mysql表里面的数据迁移到HBase中,在这里我就不讲解、不争论为何要使用HBase,HBase是什么了,喜欢的就认真看下去,总有些地方是有用的 java

咱们要作的3大步骤: mysql


  1. 新建HBase表格。 web

  2. 把MYSQL数据迁移到HBase中。 sql

  3. 在Java Web项目中读取HBase的数据。 数据库


先介绍一下必要的一些环境: apache

HBase的版本:0.98.8-hadoop2 windows

所需的依赖包api

commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.3.jar
guava-12.0.1.jar
hadoop-auth-2.5.0.jar
hadoop-common-2.5.0.jar
hbase-client-0.98.8-hadoop2.jar
hbase-common-0.98.8-hadoop2.jar
hbase-protocol-0.98.8-hadoop2.jar
htrace-core-2.04.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.7-bin.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
zookeeper-3.4.6.jar

若是在你的web项目中有些包已经存在,保留其中一个就行了,省得报奇怪的错误就麻烦了。 app


步骤1:建表 分布式

在此以前,我在Mysql中的业务数据表一共有6个,其结构重复性过高了,首先看看我在HBase里面的表结构:

表名 kpi
key fid+tid+date
簇(family) base gpower userate consum time
描述 基础信息 发电量相关指标 可利用率 自耗电量 累计运行小时数 检修小时数 利用小时数
列(qualifier) fid tid date power windspeed unpower theory coup time power num cpower gpower runtime checktime usetime
描述 风场ID 风机号 日期 发电量 风速 弃风电量 理论电量 耦合度 故障时间 故障损失电量 故障台次 当天自耗电量 当天发电量 当天并网秒数 当天检修秒数 当天利用秒数

这个表中咱们有5个family,其中base Family是对应6个mysql表中的key列, gpower、userate、consum分别对应一个表,time对应3个表。

这个kpi表的rowkey设计是base中的3个qualifier,分别从3个维度查询数据,这样的设计已经能够知足咱们的需求了。

具体在HBase中如何建表如何搭建环境本身参考我写的【手把手教你配置HBase彻底分布式环境】这篇文章吧。


步骤2:把MySQL数据迁移到HBase


这时我用gpower对应的mysql表来作演示吧,其余表的道理都同样。(这里可能有人会说为何不用第三方插件直接数据库对数据库迁移,这里我统一回答一下,我不会,我也不须要。)

okay,首先咱们来看看代码先吧:

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class GpowerTransfer{
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//这里是你HBase的分布式集群结点,用逗号分开。
	private static final String CLIENT_PORT = "2181";//端口
	private static Logger log = Logger.getLogger(GpowerTransfer.class);
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		BasicConfigurator.configure();
		log.setLevel(Level.DEBUG);
		String tableName = "kpi";//HBase表名称
		Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", QUOREM);   
        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
        try { File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();//这几段奇怪的代码在windows跑的时候不加有时候分报错,在web项目中能够不要,但单独的java程序仍是加上去吧,知道什么缘由的小伙伴能够告诉我一下,不胜感激。
			HBaseAdmin admin = new HBaseAdmin(conf);
			if(admin.tableExists(tableName)){
				Class.forName("com.mysql.jdbc.Driver");//首先将mysql中的数据读取出来,而后再插入到HBase中
				String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";
				String username = "********";
				String password = "********";
				Connection con = DriverManager.getConnection(url, username, password);
				PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");
				ResultSet rs = pstmt.executeQuery();
				HTable table = new HTable(conf, tableName);
				log.debug(tableName + ":start copying data to hbase...");
				List<Put> list = new ArrayList<Put>();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
				String base = "base";//family名称
				String gpower = "gpower";//family名称
				String[] qbase = {"fid","tid","date"};//qualifier名称
				String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名称
				while(rs.next()){
				    String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
				    Put put = new Put(Bytes.toBytes(rowKey));//新建一条记录,而后下面对相应的列进行赋值
				    put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
				    put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
				    put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
				    put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
				    put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
				    put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
				    put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
				    put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
				    list.add(put);
				}
				table.put(list);//这里真正对表进行插入操做
				log.debug(tableName + ":completed data copy!");
				table.close();//这里要很是注意一点,若是你频繁地对表进行打开跟关闭,性能将会直线降低,可能跟集群有关系。
			}else{
				admin.close();
				log.error("table '" + tableName + "' not exisit!");
				throw new IllegalArgumentException("table '" + tableName + "' not exisit!");
			}
			admin.close();
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
}



在put语句进行add的时候要特别注意:对于int、float、Date等等非String类型的数据,要记得将其转换成String类型,这里我直接用+""解决了,不然在你读取数据的时候就会遇到麻烦了。

步骤3:Java Web项目读取HBase里面的数据

ok,咱们成功地把数据迁移到HBase,咱们剩下的任务就是在Web应用中读取数据了。

首先咱们要确保Web项目中已经把必要的Jar包添加到ClassPath了,下面我对一些HBase的链接作了小封装:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;

/**
 * @author a01513
 *
 */
public class HBaseConnector {
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";
	private static final String CLIENT_PORT = "2181";
	private HBaseAdmin admin;
	private Configuration conf;
	
	
	public HBaseAdmin getHBaseAdmin(){
		getConfiguration();
        try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return admin; 
	}
	
	public Configuration getConfiguration(){
		if(conf == null){
			conf = HBaseConfiguration.create();
	        conf.set("hbase.zookeeper.quorum", QUOREM);   
	        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);   
		}
		return conf;
	}



这里的代码基本上跟迁移的那部分代码同样,因为我在其余地方都要重用这些代码,就装在一个地方省得重复写了。

我在Service层作了一下测试,下面看看具体的读取过程:

private final String tableName = "kpi";
@Override
	public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {
		List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();
		HBaseConnector hbaseConn = new HBaseConnector();
		HBaseAdmin admin = hbaseConn.getHBaseAdmin();
		try {
			if(admin.tableExists(tableName)){
				HTable table = new HTable(hbaseConn.getConfiguration(), tableName);
				Scan scan = new Scan();
				scan.addFamily(Bytes.toBytes("base"));
				scan.addFamily(Bytes.toBytes("gpower"));
				scan.addFamily(Bytes.toBytes("userate"));
				String startRowKey = new String();
				String stopRowKey = new String();
				if("".equals(start) && !"".equals(end)){
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else if(!"".equals(start) && "".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					scan.setStartRow(Bytes.toBytes(startRowKey));
				}else if(!"".equals(start) && !"".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStartRow(Bytes.toBytes(startRowKey));
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else{
					table.close();
					admin.close();
					return null;
				}
				ResultScanner rsc = table.getScanner(scan);
				Iterator<Result> it = rsc.iterator();
				List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();
				List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();
				String tempRowKey = "";//这个临时rowkey是用来判断一行数据是否已经读取完了的。
				GenPowerEntity gpower = new GenPowerEntity();
				UseRateEntity userate = new UseRateEntity();
				while(it.hasNext()){
					for(Cell cell: it.next().rawCells()){
						String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");
						String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");
						String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");
						String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如咱们当时插入HBase的时候没有把int、float等类型的数据转换成String,这里就会乱码了,而且用Bytes.toInt()这个方法还原也没有用,哈哈
						System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);
						if("".equals(tempRowKey))
							tempRowKey = rowKey;
						if(!rowKey.equals(tempRowKey)){
							slist.add(gpower);
							ulist.add(userate);
							gpower = null;
							userate = null;
							gpower = new GenPowerEntity();
							userate = new UseRateEntity();
							tempRowKey = rowKey;
						}
						switch(family){
						case "base":
							switch(qualifier){
							case "fid":
								gpower.setFarmid(value);
								userate.setFarmid(value);
								break;
							case "tid":
								gpower.setTurbineid(Integer.parseInt(value));
								userate.setTurbineid(Integer.parseInt(value));
								break;
							case "date":
								SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
								Date date = null;
								try {
									date = sdf.parse(value);
								} catch (ParseException e) {
									e.printStackTrace();
								}
								gpower.setVtime(date);
								userate.setVtime(date);
								break;
							}
							break;
						case "gpower":
							switch(qualifier){
							case "power":
								gpower.setValue(Float.parseFloat(value));
								break;
							case "windspeed":
								gpower.setWindspeed(Float.parseFloat(value));
								break;
							case "unpower":
								gpower.setUnvalue(Float.parseFloat(value));
								break;
							case "theory":
								gpower.setTvalue(Float.parseFloat(value));
								break;
							case "coup":
								gpower.setCoup(Float.parseFloat(value));
								break;
							}
							break;
						case "userate":
							switch(qualifier){
							case "num":
								userate.setFnum(Integer.parseInt(value));
								break;
							case "power":
								userate.setFpower(Float.parseFloat(value));
								break;
							case "time":
								userate.setFvalue(Float.parseFloat(value));
								break;
							}
							break;
						}
						
					}
				}
				rsc.close();
				table.close();
				admin.close();
				......
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return list;
	}



这是我在Service层中用做测试的一个方法,业务逻辑代码能够直接无视(已经用.....代替了,哈哈),至此咱们的全部工做完成,对于更深刻的应用,还要靠本身去认真挖掘学习了。

相关文章
相关标签/搜索