HBase是一个分布式的、面向列的开源数据库,它是一个适合于非结构化数据存储的数据库。另外一个不一样的是HBase基于列的而不是基于行的模式。
大:上亿行、百万列
面向列:面向列(族)的存储和权限控制,列(簇)独立检索
稀疏:对于为空(null)的列,并不占用存储空间,所以,表的设计的很是的稀疏html
功能:
1) 监控RegionServer
2) 处理RegionServer故障转移
3) 处理元数据的变动
4) 处理region的分配或移除
5) 在空闲时间进行数据的负载均衡
6) 经过Zookeeper发布本身的位置给客户端java
功能:
1) 负责存储HBase的实际数据
2) 处理分配给它的Region
3) 刷新缓存到HDFS
4) 维护HLog
5) 执行压缩
6) 负责处理Region分片node
组件:mysql
Hbase表的分片,HBase表会根据RowKey值被切分红不一样的region存储在RegionServer中,在一个RegionServer中能够有多个不一样的region。linux
一个RegionServer能够包含多个HRegion,每一个RegionServer维护一个HLog,和多个HFiles以及其对应的MemStore。RegionServer运行于DataNode上,数量能够与DatNode数量一致,请参考以下架构图:web
肯定一个单元格的位置(cell),须要以下四个算法
rowkey + Colume Family + Colume + timestamp(版本version),数据有版本的概念,即一个单元格可能有多个值,可是只有最新得一个对外显示。sql
Row Key:
行键,Table的主键,Table中的记录默认按照Row Key升序排序shell
Timestamp:
时间戳,每次数据操做对应的时间戳,能够看做是数据的version number数据库
Column Family:
列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中能够由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,全部Column均以二进制格式存储,用户须要自行进行类型转换。
Table & Region:
当Table随着记录数不断增长而变大后,会逐渐分裂成多份splits,成为regions,一个region由[startkey,endkey)表示,不一样的region会被Master分配给相应的RegionServer进行管理:.
HMaster:
HMaster没有单点问题,HBase中能够启动多个HMaster,经过Zookeeper的Master Election机制保证总有一个Master运行,HMaster在功能上主要负责Table和Region的管理工做:1. 管理用户对Table的增、删、改、查操做。2. 管理HRegionServer的负载均衡,调整Region分布。3. 在Region Split后,负责新Region的分配。4. 在HRegionServer停机后,负责失效HRegionServer 上的Regions迁移
HRegionServer:
HRegionServer内部管理了一系列HRegion对象,每一个HRegion对应了Table中的一个Region,HRegion中由多个HStore组成。每一个HStore对应了Table中的一个Column Family的存储,能够看出每一个Column Family其实就是一个集中的存储单元,所以最好将具有共同IO特性的column放在一个Column Family中,这样最高效。
MemStore & StoreFiles
HStore存储是HBase存储的核心了,其中由两部分组成,一部分是MemStore,一部分是StoreFiles。MemStore是Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了之后会Flush成一个StoreFile(底层实现是HFile),当StoreFile文件数量增加到必定阈值,会触发Compact合并操做,将多个StoreFiles合并成一个StoreFile,合并过程当中会进行版本合并和数据删除,所以能够看出HBase其实只有增长数据,全部的更新和删除操做都是在后续的compact过程当中进行的,这使得用户的写操做只要进入内存中就能够当即返回,保证了HBase I/O的高性能。当StoreFiles Compact后,会逐步造成愈来愈大的StoreFile,当单个StoreFile大小超过必定阈值后,会触发Split操做,同时把当前Region Split成2个Region,父Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。
HLog
每一个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操做写入MemStore的同时,也会写一份数据到HLog文件中,HLog文件按期会滚动出新的,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会经过Zookeeper感知到,HMaster首先会处理遗留的 HLog文件,将其中不一样Region的Log数据进行拆分,分别放到相应region的目录下,而后再将失效的region从新分配,领取 到这些region的HRegionServer在Load Region的过程当中,会发现有历史HLog须要处理,所以会Replay HLog中的数据到MemStore中,而后flush到StoreFiles,完成数据恢复。
文件类型
HBase中的全部数据文件都存储在Hadoop HDFS文件系统上,主要包括上述提出的两种文件类型:
首先保证Zookeeper集群的正常部署,并启动之:
/opt/module/zookeeper-3.4.5/bin/zkServer.sh start
Hadoop集群的正常部署并启动:
/opt/module/hadoop-2.8.4/sbin/start-dfs.sh /opt/module/hadoop-2.8.4/sbin/start-yarn.sh
解压HBase到指定目录:
tar -zxf /opt/software/hbase-1.3.1-bin.tar.gz -C /opt/module/
须要修改HBase对应的配置文件。
hbase-env.sh修改内容:
export JAVA_HOME=/opt/module/jdk1.8.0_121 export HBASE_MANAGES_ZK=false 尖叫提示:若是使用的是JDK8以上版本,注释掉hbase-env.sh的45-47行,否则会报警告
hbase-site.xml修改内容:
<property> <name>hbase.rootdir</name> <value>hdfs://bigdata111:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.master.port</name> <value>16000</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>bigdata111:2181,bigdata112:2181,bigdata113:2181</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/module/zookeeper-3.4.10/zkData</value> </property> <property> <name>hbase.master.maxclockskew</name> <value>180000</value> </property>
regionservers:
bigdata111 bigdata112 bigdata113
因为HBase须要依赖Hadoop,因此替换HBase的lib目录下的jar包,以解决兼容问题:
1) 删除原有的jar:
rm -rf /opt/module/hbase-1.3.1/lib/hadoop-* rm -rf /opt/module/hbase-1.3.1/lib/zookeeper-3.4.10.jar
2)拷贝新jar,涉及的jar有:
hadoop-annotations-2.8.4.jar hadoop-auth-2.8.4.jar hadoop-client-2.8.4.jar hadoop-common-2.8.4.jar hadoop-hdfs-2.8.4.jar hadoop-mapreduce-client-app-2.8.4.jar hadoop-mapreduce-client-common-2.8.4.jar hadoop-mapreduce-client-core-2.8.4.jar hadoop-mapreduce-client-hs-2.8.4.jar hadoop-mapreduce-client-hs-plugins-2.8.4.jar hadoop-mapreduce-client-jobclient-2.8.4.jar hadoop-mapreduce-client-jobclient-2.8.4-tests.jar hadoop-mapreduce-client-shuffle-2.8.4.jar hadoop-yarn-api-2.8.4.jar hadoop-yarn-applications-distributedshell-2.8.4.jar hadoop-yarn-applications-unmanaged-am-launcher-2.8.4.jar hadoop-yarn-client-2.8.4.jar hadoop-yarn-common-2.8.4.jar hadoop-yarn-server-applicationhistoryservice-2.8.4.jar hadoop-yarn-server-common-2.8.4.jar hadoop-yarn-server-nodemanager-2.8.4.jar hadoop-yarn-server-resourcemanager-2.8.4.jar hadoop-yarn-server-tests-2.8.4.jar hadoop-yarn-server-web-proxy-2.8.4.jar zookeeper-3.4.10.jar //尖叫提示:这些jar包的对应版本应替换成你目前使用的hadoop版本,具体状况具体分析。 //查找jar包举例: find /opt/module/hadoop-2.8.4/ -name hadoop-annotations* //而后将找到的jar包复制到HBase的lib目录下便可。
ln -s /opt/module/hadoop-2.8.4/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/core-site.xml ln -s /opt/module/hadoop-2.8.4/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/hdfs-site.xml
vi /etc/profile
export HBASE_HOME=/opt/module/hbase-1.3.1 export PATH=$HBASE_HOME/bin:$PATH source /etc/profile
scp -r /opt/module/hbase-1.3.1/ bigdata112:/opt/module/ scp -r /opt/module/hbase-1.3.1/ bigdata113:/opt/module/
启动方式1:
bin/hbase-daemon.sh start master bin/hbase-daemon.sh start regionserver 尖叫提示:若是集群之间的节点时间不一样步,会致使regionserver没法启动,抛出ClockOutOfSyncException异常。
启动方式2:
bin/start-hbase.sh 对应的中止服务: bin/stop-hbase.sh
//启动成功后,能够经过“host:port”的方式来访问HBase管理页面,例如: http://bigdata111:16010
bin/hbase shell
hbase(main)> help
hbase(main)> list
hbase(main)> list_namespace
hbase(main)> create 'student','info' hbase(main)> create 'iparkmerchant_order','smzf' hbase(main)> create 'staff','info'
hbase(main) > put 'student','1001','info:name','Thomas' hbase(main) > put 'student','1001','info:sex','male' hbase(main) > put 'student','1001','info:age','18' hbase(main) > put 'student','1002','info:name','Janna' hbase(main) > put 'student','1002','info:sex','female' hbase(main) > put 'student','1002','info:age','20'
数据插入后的数据模型
hbase(main) > scan 'student' hbase(main) > scan 'student',{STARTROW => '1001', STOPROW => '1001'} hbase(main) > scan 'student',{STARTROW => '1001'} 注:这个是从哪个rowkey开始扫描
hbase(main):012:0> desc 'student'
hbase(main) > put 'student','1001','info:name','Nick' hbase(main) > put 'student','1001','info:age','100' hbase(main) > put 'student','1001','info:isNull',''(仅测试空值问题)
hbase(main) > get 'student','1001' hbase(main) > get 'student','1001','info:name'
//删除某rowkey的所有数据: hbase(main) > deleteall 'student','1001'
hbase(main) > truncate 'student' //尖叫提示:清空表的操做顺序为先disable,而后再truncate。
//首先须要先让该表为disable状态: hbase(main) > disable 'student' //检查这个表是否被禁用 hbase(main) > is_enabled 'hbase_book' hbase(main) > is_disabled 'hbase_book' //恢复被禁用得表 enable 'student' //而后才能drop这个表: hbase(main) > drop 'student' //尖叫提示:若是直接drop表,会报错:Drop the named table. Table must first be disabled ERROR: Table student is enabled. Disable it first.
hbase(main) > count 'student'
//将info列族中的数据存放3个版本: hbase(main) > alter 'student',{NAME=>'info',VERSIONS=>3} //查看student的最新的版本的数据 hbase(main) > get 'student','1001' //查看HBase中的多版本 hbase(main) > get 'student','1001',{COLUMN=>'info:name',VERSIONS=>10}
hbase> status 'bigdata111'
hbase> exist 'hbase_book'
hbase> is_enabled 'hbase_book' hbase> is_disabled 'hbase_book'
为当前表增长列族: hbase> alter 'hbase_book', NAME => 'CF2', VERSIONS => 2 为当前表删除列族: hbase> alter 'hbase_book', 'delete' => 'CF2'
hbase> disable 'hbase_book' hbase> drop 'hbase_book'
删除一行中一个单元格的值,例如: hbase> delete 'hbase_book', 'rowKey', 'CF:C'
hbase> truncate 'hbase_book'
建立多个列族: hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
尖叫提示:由于内存空间是有限的,因此说溢写过程一定伴随着大量的小文件产生。
新建项目后在pom.xml中添加依赖:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency>
注意,这部分的学习内容,咱们先学习使用老版本的API,接着再写出新版本的API调用方式。由于在企业中,有些时候咱们须要一些过期的API来提供更好的兼容性。
public static Configuration conf; static{ //使用HBaseConfiguration的单例方法实例化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "bigdata111"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("zookeeper.znode.parent", "/hbase"); }
public static boolean isTableExist(String tableName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ //在HBase中管理、访问表须要先建立HBaseAdmin对象 Connection connection = ConnectionFactory.createConnection(conf); HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); //HBaseAdmin admin = new HBaseAdmin(conf); return admin.tableExists(tableName); }
public static void createTable(String tableName, String... columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{ HBaseAdmin admin = new HBaseAdmin(conf); //判断表是否存在 if(isTableExist(tableName)){ System.out.println("表" + tableName + "已存在"); //System.exit(0); }else{ //建立表属性对象,表名须要转字节 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); //建立多个列族 for(String cf : columnFamily){ descriptor.addFamily(new HColumnDescriptor(cf)); } //根据对表的配置,建立表 admin.createTable(descriptor); System.out.println("表" + tableName + "建立成功!"); } }
public static void dropTable(String tableName) throws Exception{ HBaseAdmin admin = new HBaseAdmin(conf); if(isTableExist(tableName)){ admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("表" + tableName + "删除成功!"); }else{ System.out.println("表" + tableName + "不存在!"); } }
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception{ //建立HTable对象 HTable hTable = new HTable(conf, tableName); //向表中插入数据 Put put = new Put(Bytes.toBytes(rowKey)); //向Put对象中组装数据 put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); hTable.put(put); hTable.close(); System.out.println("插入数据成功"); }
public static void deleteMultiRow(String tableName, String... rows) throws IOException{ HTable hTable = new HTable(conf, tableName); List<Delete> deleteList = new ArrayList<Delete>(); for(String row : rows){ Delete delete = new Delete(Bytes.toBytes(row)); deleteList.add(delete); } hTable.delete(deleteList); hTable.close(); }
public static void getAllRows(String tableName) throws IOException{ HTable hTable = new HTable(conf, tableName); //获得用于扫描region的对象 Scan scan = new Scan(); //使用HTable获得resultcanner实现类的对象 ResultScanner resultScanner = hTable.getScanner(scan); for(Result result : resultScanner){ Cell[] cells = result.rawCells(); for(Cell cell : cells){ //获得rowkey System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell))); //获得列族 System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } }
public static void getRow(String tableName, String rowKey) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); //get.setMaxVersions();显示全部版本 //get.setTimeStamp();显示指定时间戳的版本 Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行键:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); System.out.println("时间戳:" + cell.getTimestamp()); } }
public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException{ HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result result = table.get(get); for(Cell cell : result.rawCells()){ System.out.println("行键:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.text.DecimalFormat; import java.util.Iterator; import java.util.TreeSet; /** * @author Andy * 一、NameSpace ====> 命名空间 * 二、createTable ===> 表 * 三、isTable ====> 判断表是否存在 * 四、Region、RowKey、分区键 */ public class HBaseUtil { /** * 初始化命名空间 * * @param conf 配置对象 * @param namespace 命名空间的名字 * @throws Exception */ public static void initNameSpace(Configuration conf, String namespace) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); //命名空间描述器 NamespaceDescriptor nd = NamespaceDescriptor .create(namespace) .addConfiguration("AUTHOR", "Andy") .build(); //经过admin对象来建立命名空间 admin.createNamespace(nd); System.out.println("已初始化命名空间"); //关闭两个对象 close(admin, connection); } /** * 关闭admin对象和connection对象 * * @param admin 关闭admin对象 * @param connection 关闭connection对象 * @throws IOException IO异常 */ private static void close(Admin admin, Connection connection) throws IOException { if (admin != null) { admin.close(); } if (connection != null) { connection.close(); } } /** * 建立HBase的表 * @param conf * @param tableName * @param regions * @param columnFamily */ public static void createTable(Configuration conf, String tableName, int regions, String... columnFamily) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); //判断表 if (isExistTable(conf, tableName)) { return; } //表描述器 HTableDescriptor HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); for (String cf : columnFamily) { //列描述器 :HColumnDescriptor htd.addFamily(new HColumnDescriptor(cf)); } //htd.addCoprocessor("hbase.CalleeWriteObserver"); //建立表 admin.createTable(htd,genSplitKeys(regions)); System.out.println("已建表"); //关闭对象 close(admin,connection); } /** * 分区键 * @param regions region个数 * @return splitKeys */ private static byte[][] genSplitKeys(int regions) { //存放分区键的数组 String[] keys = new String[regions]; //格式化分区键的形式 00 01 02 DecimalFormat df = new DecimalFormat("00"); for (int i = 0; i < regions; i++) { keys[i] = df.format(i) + ""; } byte[][] splitKeys = new byte[regions][]; //排序 保证你这个分区键是有序得 TreeSet<byte[]> treeSet = new TreeSet<>(Bytes.BYTES_COMPARATOR); for (int i = 0; i < regions; i++) { treeSet.add(Bytes.toBytes(keys[i])); } //输出 Iterator<byte[]> iterator = treeSet.iterator(); int index = 0; while (iterator.hasNext()) { byte[] next = iterator.next(); splitKeys[index++]= next; } return splitKeys; } /** * 判断表是否存在 * @param conf 配置 conf * @param tableName 表名 */ public static boolean isExistTable(Configuration conf, String tableName) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); boolean result = admin.tableExists(TableName.valueOf(tableName)); close(admin, connection); return result; } }
import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class PropertiesUtil { public static Properties properties = null; static { //获取配置文件、方便维护 InputStream is = ClassLoader.getSystemResourceAsStream("hbase_consumer.properties"); properties = new Properties(); try { properties.load(is); } catch (IOException e) { e.printStackTrace(); } } /** * 获取参数值 * @param key 名字 * @return 参数值 */ public static String getProperty(String key){ return properties.getProperty(key); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; public class HBaseDAO { private static String namespace = PropertiesUtil.getProperty("hbase.calllog.namespace"); private static String tableName = PropertiesUtil.getProperty("hbase.calllog.tablename"); private static Integer regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions")); public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "bigdata111"); conf.set("zookeeper.znode.parent", "/hbase"); if (!HBaseUtil.isExistTable(conf, tableName)) { HBaseUtil.initNameSpace(conf, namespace); HBaseUtil.createTable(conf, tableName, regions, "f1", "f2"); } }
经过HBase的相关JavaAPI,咱们能够实现伴随HBase操做的MapReduce过程,好比使用MapReduce将数据从本地文件系统导入到HBase的表中,好比咱们从HBase中读取一些原始数据后使用MapReduce作数据分析。
$ bin/hbase mapredcp
$ export HBASE_HOME=/opt/module/hbase-1.3.1 $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
--案例一:统计Student表中有多少行数据
$ /opt/module/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter ns_ct:calllog
--案例二:使用MapReduce将本地数据导入到HBase
1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow //尖叫提示:上面的这个数据不要从word中直接复制,有格式错误
hbase(main):001:0> create 'fruit','info'
$ /opt/module/hadoop-2.8.4/bin/hdfs dfs -mkdir /input_fruit/ $ /opt/module/hadoop-2.8.4/bin/hdfs dfs -put fruit.tsv /input_fruit/
$ /opt/module/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \ hdfs://bigdata11:9000/input_fruit
hbase(main):001:0> scan 'fruit'
目标:将fruit表中的一部分数据,经过MR迁入到fruit_mr表中。
分步实现:
import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //将fruit的name和color提取出来,至关于将每一行数据读取出来放入到Put对象中。 Put put = new Put(key.get()); //遍历添加column行 for(Cell cell: value.rawCells()){ //添加/克隆列族:info if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ //添加/克隆列:name if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //将该列cell加入到put对象中 put.add(cell); //添加/克隆列:color }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //向该列cell加入到put对象中 put.add(cell); } } } //将从fruit读取到的每行数据写入到context中做为map的输出 context.write(key, put); } }
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //读出来的每一行数据写入到fruit_mr表中 for(Put put: values){ context.write(NullWritable.get(), put); } } }
package MRToHBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class Fruit2FruitMRRunner extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { //获得Configuration Configuration conf = this.getConf(); //建立Job任务 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Fruit2FruitMRRunner.class); //配置Job Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); //设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本 TableMapReduceUtil.initTableMapperJob( "fruit", //数据源的表名 scan, //scan扫描控制器 ReadFruitMapper.class,//设置Mapper类 ImmutableBytesWritable.class,//设置Mapper输出key类型 Put.class,//设置Mapper输出value值类型 job//设置给哪一个JOB ); //设置Reducer TableMapReduceUtil.initTableReducerJob( "fruit_mr", WriteFruitMRReducer.class, job); //设置Reduce数量,最少1个 job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args); System.exit(status); } }
$ /opt/module/hadoop-2.8.4/bin/yarn jar /opt/module/hbase-1.3.1/HBase-1.0-SNAPSHOT.jar MRToHBase.Fruit2FruitMRRunner //尖叫提示:运行任务前,若是待数据导入的表不存在,则须要提早建立之。
目标:实现将HDFS中的数据写入到HBase表中。
分步实现:
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //从HDFS中读取的数据 String lineValue = value.toString(); //读取出来的每行数据使用\t进行分割,存于String数组 String[] values = lineValue.split("\t"); //根据数据中值的含义取值 String rowKey = values[0]; String name = values[1]; String color = values[2]; //初始化rowKey ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); //初始化put对象 Put put = new Put(Bytes.toBytes(rowKey)); //参数分别:列族、列、值 put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color)); context.write(rowKeyWritable, put); } }
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //读出来的每一行数据写入到fruit_hdfs表中 for(Put put: values){ context.write(NullWritable.get(), put); } } }
package HDFSToHBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class Txt2FruitRunner extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { //获得Configuration Configuration conf = this.getConf(); //建立Job任务 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Txt2FruitRunner.class); Path inPath = new Path("hdfs://bigdata11:9000/input_fruit/fruit.tsv"); FileInputFormat.addInputPath(job, inPath); //设置Mapper job.setMapperClass(ReadFruitFromHDFSMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //设置Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job); //设置Reduce数量,最少1个 job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if (!isSuccess) { throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Txt2FruitRunner(), args); System.exit(status); } }
/opt/module/hadoop-2.8.4/bin/yarn jar HDFSToHBase.jar HDFSToHBase.ReadFruitFromHDFSMapper //尖叫提示:运行任务前,若是待数据导入的表不存在,则须要提早建立之。
Hive | HBase | |
---|---|---|
特色 | 类SQL 数据仓库 | NoSQL (Key-value) |
适用场景 | 离线数据分析和清洗 | 适合在线业务 |
延迟 | 延迟高 | 延迟低 |
存储位置 | 存储在HDFS | 存储在HDFS |
环境准备
由于咱们后续可能会在操做Hive的同时对HBase也会产生影响,因此Hive须要持有操做HBase的Jar,那么接下来拷贝Hive所依赖的Jar包(或者使用软链接的形式)。记得还有把zookeeper的jar包考入到hive的lib目录下。
#环境变量/etc/profile $ export HBASE_HOME=/opt/module/hbase-1.3.1 $ export HIVE_HOME=/opt/module/apache-hive-1.2.2-bin
#Shell执行 $ ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar $ ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar $ ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar $ ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar $ ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar $ ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar $ ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $ ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
<property> <name>hive.zookeeper.quorum</name> <value>bigdata11,bigdata12,bigdata13</value> <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property> <property> <name>hive.zookeeper.client.port</name> <value>2181</value> <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property>
目标:创建Hive表,关联HBase表,插入数据到Hive表的同时可以影响HBase表。
分步实现:
(1) 在Hive中建立表同时关联HBase
CREATE TABLE hive_hbase_emp_table1( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table1");
尖叫提示:完成以后,能够分别进入Hive和HBase查看,都生成了对应的表
(2) 在Hive中建立临时中间表,用于load文件中的数据
尖叫提示:不能将数据直接load进Hive所关联HBase的那张表中
CREATE TABLE emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) row format delimited fields terminated by '\t';
(3) 向Hive中间表中load数据
hive> load data local inpath '/opt/module/datas/emp.txt' into table emp;
(4) 经过insert命令将中间表中的数据导入到Hive关联HBase的那张表中
hive> insert into table hive_hbase_emp_table1 select * from emp;
(5) 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据
Hive:
hive> select * from hive_hbase_emp_table;
HBase:
hbase> scan 'hbase_emp_table'
目标:在HBase中已经存储了某一张表hbase_emp_table,而后在Hive中建立一个外部表来关联HBase中的hbase_emp_table这张表,使之能够借助Hive来分析HBase这张表中的数据。
注:该案例2紧跟案例1的脚步,因此完成此案例前,请先完成案例1。
分步实现:
(1) 在Hive中建立外部表
CREATE EXTERNAL TABLE relevance_hbase_emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table1");
(2) 关联后就可使用Hive函数进行一些分析操做了
hive (default)> select * from relevance_hbase_emp;
Sqoop supports additional import targets beyond HDFS and Hive. Sqoop can also import records into a table in HBase.
以前咱们已经学习过如何使用Sqoop在Hadoop集群和关系型数据库中进行数据的导入导出工做,接下来咱们学习一下利用Sqoop在HBase和RDBMS中进行数据的转储。
相关参数:
参数 | 描述 |
---|---|
--column-family
|
设置导入的目标列族。 |
--hbase-create-table | 是否自动建立不存在的HBase表(这就意味着,不须要手动提早在HBase中先创建表) |
--hbase-table
|
指定数据将要导入到HBase中的哪张表中。 |
--hbase-bulkload | 是否容许bulk形式的导入。 |
--hbase-row-key | mysql中哪一列的值做为HBase的rowkey,若是rowkey是个组合键,则以逗号分隔。(注:避免rowkey的重复) |
目标:将RDBMS中的数据抽取到HBase中
分步实现:
(1) 配置sqoop-env.sh,添加以下内容:
export HBASE_HOME=/opt/module/hbase-1.3.1
(2) 在Mysql中新建一个数据库db_library,一张表book
CREATE DATABASE db_library; CREATE TABLE db_library.book( id int(4) PRIMARY KEY NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL, price VARCHAR(255) NOT NULL);
(3) 向表中插入一些数据
INSERT INTO db_library.book (name, price) VALUES('Lie Sporting', '30'); INSERT INTO db_library.book (name, price) VALUES('Pride & Prejudice', '70'); INSERT INTO db_library.book (name, price) VALUES('Fall of Giants', '50');
(4) 执行Sqoop导入数据的操做
//手动建立HBase表 hbase> create 'hbase_book','info'
(5) 在HBase中scan这张表获得以下内容
hbase> scan 'hbase_book'
思考:尝试使用复合键做为导入数据时的rowkey。
$ bin/sqoop import \ --connect jdbc:mysql://bigdata11:3306/db_library \ --username root \ --password 000000 \ --table book \ --columns "id,name,price" \ --column-family "info" \ --hbase-create-table \ --hbase-row-key "id" \ --hbase-table "hbase_book" \ --num-mappers 1 \ --split-by id
&emsp 尖叫提示:sqoop1.4.6只支持HBase1.0.1以前的版本的自动建立HBase表的功能
&emsp 能够把Phoenix理解为Hbase的查询引擎,phoenix,由saleforce.com开源的一个项目,后又捐给了Apache。它至关于一个Java中间件,帮助开发者,像使用jdbc访问关系型数据库一些,访问NoSql数据库HBase。
&emsp phoenix,操做的表及数据,存储在hbase上。phoenix只是须要和Hbase进行表关联起来。而后再用工具进行一些读或写操做。
&emsp 其实,能够把Phoenix只当作一种代替HBase的语法的一个工具。虽然能够用java能够用jdbc来链接phoenix,而后操做HBase,可是在生产环境中,不能够用在OLTP中。在线事务处理的环境中,须要低延迟,而Phoenix在查询HBase时,虽然作了一些优化,但延迟仍是不小。因此依然是用在OLAT中,再将结果返回存储下来。
tar -zxvf apache-phoenix-4.14.1-HBase-1.2-bin.tar.gz -C /opt/module mv apache-phoenix-4.14.1-HBase-1.2-bin phoenix-4.14.1 //环境变量vi /etc/profile //在最后两行加上以下phoenix配置 export PHOENIX_HOME=/opt/module/phoenix-4.14.1 export PATH=$PATH:$PHOENIX_HOME/bin //使环境变量配置生效 source /etc/profile //将主节点的phoenix包传到从节点 $ scp -r phoenix-4.14.1 root@bigdata13:/opt/module $ scp -r phoenix-4.14.1 root@bigdata12:/opt/module //拷贝hbase-site.xml(注)三台都要 cp hbase-site.xml /opt/module/phoenix-4.14.1/bin/ //将以下两个jar包,目录在/opt/module/phoenix-4.14.1下,拷贝到hbase的lib目录,目录在/opt/module/hbase-1.3.1/lib/ (注)三台都要 phoenix-4.10.0-HBase-1.2-server.jar phoenix-core-4.10.0-HBase-1.2.jar //启动Phoenix sqlline.py bigdata11:2181
#展现表 > !table #建立表 > create table test(id integer not null primary key,name varchar); > create table "Andy"( id integer not null primary key, name varchar); #删除表 drop table test; #插入数据 > upsert into test values(1,'Andy'); > upsert into users(name) values('toms'); #查询数据 phoenix > select * from test; hbase > scan 'test' #退出phoenix > !q #删除数据 delete from test where id=2; #sum函数的使用 select sum(id) from "Andy"; #增长一列 alter table "Andy" add address varchar; #删除一列 alter table "Andy" drop column address;
其余语法详见:http://phoenix.apache.org/language/index.html
#hbase中建立表 create 'teacher','info','contact' #插入数据 put 'teacher','1001','info:name','Jack' put 'teacher','1001','info:age','28' put 'teacher','1001','info:gender','male' put 'teacher','1001','contact:address','shanghai' put 'teacher','1001','contact:phone','13458646987' put 'teacher','1002','info:name','Jim' put 'teacher','1002','info:age','30' put 'teacher','1002','info:gender','male' put 'teacher','1002','contact:address','tianjian' put 'teacher','1002','contact:phone','13512436987' #在Phoenix建立映射表 create view "teacher"( "ROW" varchar primary key, "contact"."address" varchar, "contact"."phone" varchar, "info"."age" varchar, "info"."gender" varchar, "info"."name" varchar ); #在Phoenix查找数据 select * from "teacher";
当启动regionserver时,regionserver会向HMaster注册并开始接收本地数据,开始的时候,新加入的节点不会有任何数据,平衡器开启的状况下,将会有新的region移动到开启的RegionServer上。若是启动和中止进程是使用ssh和HBase脚本,那么会将新添加的节点的主机名加入到conf/regionservers文件中。
1)$ ./bin/hbase-daemon.sh stop regionserver 2)hbase(main):001:0>balance_switch true
顾名思义,就是从当前HBase集群中删除某个RegionServer,这个过程分为以下几个过程:
在0.90.2以前,咱们只能经过在要卸载的节点上执行
hbase> balance_switch false
[root@bigdata11 hbase-1.3.1] hbase-daemon.sh stop regionserver
hbase> balance_switch true
这种方法很大的一个缺点是该节点上的Region会离线很长时间。由于假如该RegionServer上有大量Region的话,由于Region的关闭是顺序执行的,第一个关闭的Region得等到和最后一个Region关闭并Assigned后一块儿上线。这是一个至关漫长的时间。每一个Region Assigned须要4s,也就是说光Assigned就至少须要2个小时。该关闭方法比较传统,须要花费必定的时间,并且会形成部分region短暂的不可用。
另外一种方案:
$ bin/graceful_stop.sh <RegionServer-hostname>
该命令会自动关闭Load Balancer,而后Assigned Region,以后会将该节点关闭。除此以外,你还能够查看remove的过程,已经assigned了多少个Region,还剩多少个Region,每一个Region 的Assigned耗时
hbase> balance_switch false
在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,若是Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,而且此时的工做状态并不会维持过久。因此HBase支持对Hmaster的高可用配置。
$ bin/stop-hbase.sh
$ touch conf/backup-masters
$ echo bigdata112 > conf/backup-masters
$ scp -r conf/ bigdata112:/opt/module/hbase-1.3.1 $ scp -r conf/ bigdata113:/opt/module/hbase-1.3.1
0.98版本以后:http://bigdata111:16010
hdfs-site.xml
属性:dfs.namenode.handler.count 解释:该属性是NameNode服务默认线程数,的默认值是10,根据机器的可用内存能够调整为50~100 属性:dfs.datanode.handler.count 解释:该属性默认值为10,是DataNode的处理线程数,若是HDFS客户端程序读写请求比较多,能够调高到15~20,设置的值越大,内存消耗越多,不要调整的太高,通常业务中,5~10便可。
hdfs-site.xml
属性:dfs.replication 解释:若是数据量巨大,且不是很是之重要,能够调整为2~3,若是数据很是之重要,能够调整为3~5。
hdfs-site.xml
属性:dfs.blocksize 解释:块大小定义,该属性应该根据存储的大量的单个文件大小来设置,若是大量的单个文件都小于100M,建议设置成64M块大小,对于大于100M或者达到GB的这种状况,建议设置成256M,通常设置范围波动在64M~256M之间。
mapred-site.xml
属性:mapreduce.jobtracker.handler.count 解释:该属性是Job任务线程数,默认值是10,根据机器的可用内存能够调整为50~100
mapred-site.xml
属性:mapreduce.tasktracker.http.threads 解释:定义HTTP服务器工做线程数,默认值为40,对于大集群能够调整到80~100
mapred-site.xml
属性:mapreduce.task.io.sort.factor 解释:文件排序时同时合并的数据流的数量,这也定义了同时打开文件的个数,默认值为10,若是调高该参数,能够明显减小磁盘IO,即减小文件读取的次数。
mapred-site.xml
属性:mapreduce.map.speculative 解释:该属性能够设置任务是否能够并发执行,若是任务多而小,该属性设置为true能够明显加快任务执行效率,可是对于延迟很是高的任务,建议改成false,这就相似于迅雷下载。
mapred-site.xml
属性:mapreduce.map.output.compress、mapreduce.output.fileoutputformat.compress 解释:对于大集群而言,建议设置Map-Reduce的输出为压缩的数据,而对于小集群,则不须要。
mapred-site.xml
属性: mapreduce.tasktracker.map.tasks.maximum mapreduce.tasktracker.reduce.tasks.maximum 解释:以上两个属性分别为一个单独的Job任务能够同时运行的Map和Reduce的数量。 设置上面两个参数时,须要考虑CPU核数、磁盘和内存容量。假设一个8核的CPU,业务内容很是消耗CPU,那么能够设置map数量为4,若是该业务不是特别消耗CPU类型的,那么能够设置map数量为40,reduce数量为20。这些参数的值修改完成以后,必定要观察是否有较长等待的任务,若是有的话,能够减小数量以加快任务执行,若是设置一个很大的值,会引发大量的上下文切换,以及内存与磁盘之间的数据交换,这里没有标准的配置数值,须要根据业务和硬件配置以及经验来作出选择。 在同一时刻,不要同时运行太多的MapReduce,这样会消耗过多的内存,任务会执行的很是缓慢,咱们须要根据CPU核数,内存容量设置一个MR任务并发的最大值,使固定数据量的任务彻底加载到内存中,避免频繁的内存和磁盘数据交换,从而下降磁盘IO,提升性能。
大概估算公式:
map = 2 + ⅔cpu_core reduce = 2 + ⅓cpu_core
$ sudo blockdev --setra 32768 /dev/sda //尖叫提示:ra是readahead的缩写
即不容许后台进程进入睡眠状态,若是进程空闲,则直接kill掉释放资源
$ sudo sysctl -w vm.swappiness=0
$ ulimit -n 查看容许最大进程数 $ ulimit -u 查看容许打开最大文件数
优化修改:
$ sudo vi /etc/security/limits.conf 修改打开文件数限制 末尾添加: * soft nofile 1024000 * hard nofile 1024000 Hive - nofile 1024000 hive - nproc 1024000 $ sudo vi /etc/security/limits.d/20-nproc.conf 修改用户打开进程数限制 修改成: #* soft nproc 4096 #root soft nproc unlimited * soft nproc 40960 root soft nproc unlimited
集群中某台机器同步网络时间服务器的时间,集群中其余机器则同步这台机器的时间。
更新补丁前,请先测试新版本补丁对集群节点的兼容性。
hbase-site.xml
参数:zookeeper.session.timeout 解释:In hbase-site.xml, set zookeeper.session.timeout to 30 seconds or less to bound failure detection (20-30 seconds is a good start).该值会直接关系到master发现服务器宕机的最大周期,默认值为30秒(不一样的HBase版本,该默认值不同),若是该值太小,会在HBase在写入大量数据发生而GC时,致使RegionServer短暂的不可用,从而没有向ZK发送心跳包,最终致使认为从节点shutdown。通常20台左右的集群须要配置5台zookeeper。****
每个region维护着startRow与endRowKey,若是加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,咱们能够将数据索要投放的分区提早大体的规划好,以提升HBase性能。
hbase> create 'staff','info','partition1',SPLITS => ['1000','2000','3000','4000']
create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
建立splits.txt文件内容以下:
aaaa bbbb cccc dddd
而后执行:
create 'staff3','partition3',SPLITS_FILE => '/opt/module/hbase-1.3.1/splits.txt'
//自定义算法,产生一系列Hash散列值存储在二维数组中 byte[][] splitKeys = 某个散列值函数 //建立HBaseAdmin实例 HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create()); //建立HTableDescriptor实例 HTableDescriptor tableDesc = new HTableDescriptor(tableName); //经过HTableDescriptor实例和散列值二维数组建立带有预分区的HBase表 hAdmin.createTable(tableDesc, splitKeys);
一条数据的惟一标识就是rowkey,那么这条数据存储于哪一个分区,取决于rowkey处于哪一个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于全部的region中,在必定程度上防止数据倾斜。接下来咱们就谈一谈rowkey经常使用的设计方案。
好比: 本来rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7 本来rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd 本来rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913 在作此操做以前,通常咱们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后做为每一个分区的临界值。
20170524000001转成10000042507102 20170524000002转成20000042507102
这样也能够在必定程度上散列逐步put进来的数据。
20170524000001_a12e 20170524000001_93i7
HBase操做过程当中须要大量的内存开销,毕竟Table是能够缓存在内存中的,通常会分配整个可用内存的70%给HBase的Java堆。可是不建议分配很是大的堆内存,由于GC过程持续过久会致使RegionServer处于长期不可用状态,通常16~48G内存就能够了,若是由于框架占用内存太高致使系统内存不足,框架同样会被系统服务拖死。
不是不容许追加内容么?没错,请看背景故事:
http://blog.cloudera.com/blog/2009/07/file-appends-in-hdfs/
hdfs-site.xml、hbase-site.xml
属性:dfs.support.append 解释:开启HDFS追加同步,能够优秀的配合HBase的数据同步和持久化。默认值为true。
hdfs-site.xml
属性:dfs.datanode.max.transfer.threads 解释:HBase通常都会同一时间操做大量的文件,根据集群的数量和规模以及数据动做,设置为4096或者更高。默认值:4096
属性:dfs.image.transfer.timeout 解释:若是对于某一次数据操做来说,延迟很是高,socket须要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。
mapred-site.xml
属性: mapreduce.map.output.compress mapreduce.map.output.compress.codec 解释:开启这两个数据能够大大提升文件的写入效率,减小写入时间。第一个属性值修改成true,第二个属性值修改成:org.apache.hadoop.io.compress.GzipCodec或者其余压缩方式。
属性:dfs.datanode.failed.volumes.tolerated 解释: 默认为0,意思是当DataNode中有一个磁盘出现故障,则会认为该DataNode shutdown了。若是修改成1,则一个磁盘出现故障时,数据会被复制到其余正常的DataNode上,当前的DataNode继续工做。
属性:hbase.regionserver.handler.count 解释:默认值为30,用于指定RPC监听的数量,能够根据客户端的请求数进行调整,读写请求较多时,增长此值。
属性:hbase.hregion.max.filesize 解释:默认值10737418240(10GB),若是须要运行HBase的MR任务,能够减少此值,由于一个region对应一个map任务,若是单个region过大,会致使map任务执行时间过长。该值的意思就是,若是HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
属性:hbase.client.write.buffer 解释:用于指定HBase客户端缓存,增大该值能够减小RPC调用次数,可是会消耗更多内存,反之则反之。通常咱们须要设定必定的缓存大小,以达到减小RPC次数的目的。
属性:hbase.client.scanner.caching 解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
hbase.hregion.memstore.flush.size:134217728
即:这个参数的做用是当单个HRegion内全部的Memstore大小总和超过指定值时,flush该HRegion的全部memstore。RegionServer的flush是经过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会致使内存陡增,最坏的状况是触发OOM。
hbase.regionserver.global.memstore.upperLimit:0.4 hbase.regionserver.global.memstore.lowerLimit:0.38
即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit