一、因为要存储消息内容供app端查询,因此每条消息须要为对应的每一个用户生成一条记录,这样在用户量大的状况下数据是海量的,没有上限,且不能清理历史数据 二、读和写的比例大体差很少
最终决定采用Hbase存储,kafka做消息中间件来构建整个系统java
咱们先来看一下Hbase是什么,为何要用Hbase?git
HBase是一种构建在HDFS之上的分布式、面向列的存储系统。在须要实时读写、随机访问超大规模数据集时,可使用HBasespring
1.大:一个表能够有上亿行,上百万列。 2.面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。 3.稀疏:对于为空(NULL)的列,并不占用存储空间,所以,表能够设计的很是稀疏。 4.无模式:每一行都有一个能够排序的主键和任意多的列,列能够根据须要动态增长,同一张表中不一样的行能够有大相径庭的列。 5.数据多版本:每一个单元中的数据能够有多个版本,默认状况下,版本号自动分配,版本号就是单元格插入时的时间戳。 6.数据类型单一:HBase中的数据都是字符串,没有类型。
Table table = connection.getTable(TableName.valueOf("表名")); Put put = new Put("112233bbbcccc".getBytes());// 一个PUT表明一行数据,再NEW一个PUT表示第二行数据,每行一个惟一的ROWKEY,此处rowkey为put构造方法中传入的值 put.add("column1".getBytes(), null, "aaa".getBytes());// 本行数据的第一列 put.add("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列 put.add("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列 table.put(put);// 保存数据
能够看到若是咱们不封装Hbase的操做,而直接在系统中使用原生API的话,会有多麻烦, 到此,引出咱们的主题,根据Hbase的特性,搭建一套ORM映射操做方式。apache
首先咱们捋清楚几个Hbase存储时候的关键点api
一、tableName:表名,须要根据此获取链接 二、family:列簇,建议把常常一块儿访问的比较相似的列放在同一个Column Family中,这样就能够在访问少数几个列时,只读取尽可能少的数据 三、qualifier:列名,对应列的value 四、timestamp:时间戳
根据特色咱们能够经过自定义注解来处理这些关键属性,以下:缓存
@HbaseTable(tableName="t_demo") // 列名 public class Demo { @HbaseColumn(family="rowkey", qualifier="rowkey") // rowkey值 private String id; @HbaseColumn(family="demo", qualifier="content") // 列 private String content; @HbaseColumn(family="demo", qualifier="avg") // 列 private String avg; }
HbaseTable:springboot
package com.muheda.notice.hbase; import java.lang.annotation.*; /** * @Author: Sorin * @Descriptions: 自定义注解,用于获取table * @Date: Created in 2018/3/22 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE }) @Inherited public @interface HbaseTable { String tableName() default ""; }
HbaseColumn:app
package com.muheda.notice.hbase; import java.lang.annotation.*; /** * @Author: Sorin * @Descriptions: 自定义注解,用于描述字段所属的 family与qualifier. 也就是hbase的列与列簇 * @Date: Created in 2018/3/22 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD }) @Inherited public @interface HbaseColumn { String family() default ""; String qualifier() default ""; boolean timestamp() default false; }
接着,咱们来封装一个Dao的操做:分布式
package com.muheda.notice.hbase; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; /** * @Author: Sorin * @Descriptions: HBaseDao操做公共类 * @Date: Created in 2018/3/22 */ @Component("hBaseDaoUtil") public class HBaseDaoUtil { protected final org.slf4j.Logger logger = LoggerFactory.getLogger(this.getClass()); // 关闭链接 public static void close() { if (HconnectionFactory.connection != null) { try { HconnectionFactory.connection.close(); } catch (IOException e) { e.printStackTrace(); } } } // 获取tableName public String getORMTable(Object obj) { HbaseTable table = obj.getClass().getAnnotation(HbaseTable.class); return table.tableName(); } /** * @Descripton: 建立表 * @Author: Sorin * @param tableName * @param familyColumn * @Date: 2018/3/22 */ public void createTable(String tableName, Set<String> familyColumn) { TableName tn = TableName.valueOf(tableName); try { Admin admin = HconnectionFactory.admin; HTableDescriptor htd = new HTableDescriptor(tn); for (String fc : familyColumn) { HColumnDescriptor hcd = new HColumnDescriptor(fc); htd.addFamily(hcd); } admin.createTable(htd); } catch (IOException e) { e.printStackTrace(); logger.error("建立"+tableName+"表失败!", e); } } /** * @Descripton: 删除表 * @Author: Sorin * @param tableName * @Date: 2018/3/22 */ public void dropTable(String tableName) { TableName tn = TableName.valueOf(tableName); try { Admin admin = HconnectionFactory.admin; admin.disableTable(tn); admin.deleteTable(tn); } catch (IOException e) { e.printStackTrace(); logger.error("删除"+tableName+"表失败!"); } } /** * @Descripton: 根据条件过滤查询 * @Author: Sorin * @param obj * @param param * @Date: 2018/3/26 */ public <T> List<T> queryScan(T obj, Map<String, String> param)throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } Scan scan = new Scan(); FilterList filter = new FilterList(); // 从缓存中取family和qualifier,拼装查询条件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue())); filter.addFilter(filterDetail); } } } scan.setFilter(filter); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } } catch (Exception e) { e.printStackTrace(); logger.error("查询失败!"); throw new Exception(e); } finally { scanner.close(); } return objs; } /** * @Descripton: 根据rowkey查询 * @Author: Sorin * @param obj * @param rowkeys * @Date: 2018/3/22 */ public <T> List<T> get(T obj, String ... rowkeys) { List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return objs; } try { Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } List<Result> results = getResults(tableName, rowkeys); if (results.isEmpty()) { return objs; } for (int i = 0; i < results.size(); i++) { T bean = null; Result result = results.get(i); if (result == null || result.isEmpty()) { continue; } bean = HBaseBeanUtil.resultToBeanNew(result, obj, tableName); objs.add(bean); } }catch (Exception e){ e.printStackTrace(); } return objs; } /** * @Descripton: 保存实体对象 * @Author: Sorin * @param objs * @Date: 2018/3/22 */ public <T> boolean save(T ... objs) { List<Put> puts = new ArrayList<Put>(); String tableName = ""; try { for (Object obj : objs) { if (obj == null) { continue; } tableName = getORMTable(obj); Put put = HBaseBeanUtil.beanToPut(obj, tableName); puts.add(put); } }catch (Exception e){ e.printStackTrace(); logger.error("保存Hbase异常!"); } return savePut(puts, tableName); } /** * @Descripton: 根据tableName保存 * @Author: Sorin * @param tableName * @param objs * @Date: 2018/3/22 */ public <T> void save(String tableName, T ... objs){ List<Put> puts = new ArrayList<Put>(); for (Object obj : objs) { if (obj == null) { continue; } try { Put put = HBaseBeanUtil.beanToPut(obj, tableName); puts.add(put); } catch (Exception e) { e.printStackTrace(); logger.warn("", e); } } savePut(puts, tableName); } /** * @Descripton: 删除 * @Author: Sorin * @param obj * @param rowkeys * @Date: 2018/3/22 */ public <T> void delete(T obj, String... rowkeys) { String tableName = ""; tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return; } List<Delete> deletes = new ArrayList<Delete>(); for (String rowkey : rowkeys) { if (StringUtils.isBlank(rowkey)) { continue; } deletes.add(new Delete(Bytes.toBytes(rowkey))); } delete(deletes, tableName); } /** * @Descripton: 批量删除 * @Author: Sorin * @param deletes * @param tableName * @Date: 2018/3/22 */ private void delete(List<Delete> deletes, String tableName) { try { Table table = getTable(tableName); if (StringUtils.isBlank(tableName)) { logger.info("tableName为空!"); return; } table.delete(deletes); } catch (IOException e) { e.printStackTrace(); logger.error("删除失败!",e); } } /** * @Descripton: 根据tableName获取列簇名称 * @Author: Sorin * @param tableName * @Date: 2018/3/22 */ public List<String> familys(String tableName) { try { Table table = getTable(tableName); List<String> columns = new ArrayList<String>(); if (table==null) { return columns; } HTableDescriptor tableDescriptor = table.getTableDescriptor(); HColumnDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies(); for (HColumnDescriptor columnDescriptor :columnDescriptors) { String columnName = columnDescriptor.getNameAsString(); columns.add(columnName); } return columns; } catch (Exception e) { e.printStackTrace(); logger.error("查询列簇名称失败!" ,e); } return new ArrayList<String>(); } // 保存方法 private boolean savePut(List<Put> puts, String tableName){ if (StringUtils.isBlank(tableName)) { return false; } try { Table table = getTable(tableName); table.put(puts); return true; }catch (IOException e) { e.printStackTrace(); return false; } } // 获取查询结果 private List<Result> getResults(String tableName, String... rowkeys) { List<Result> resultList = new ArrayList<Result>(); List<Get> gets = new ArrayList<Get>(); for (String rowkey : rowkeys) { if (StringUtils.isBlank(rowkey)) { continue; } Get get = new Get(Bytes.toBytes(rowkey)); gets.add(get); } try { Table table = getTable(tableName); Result[] results = table.get(gets); Collections.addAll(resultList, results); return resultList; } catch (Exception e) { e.printStackTrace(); return resultList; } } /** * @Descripton: 根据条件过滤查询(大于等于) * @Author: Sorin * @param obj * @param param * @Date: 2018/3/26 */ public <T> List<T> queryScanGreater(T obj, Map<String, String> param)throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Admin admin = HconnectionFactory.admin; if(!admin.isTableAvailable(TableName.valueOf(tableName))){ return objs; } Scan scan = new Scan(); // 从缓存中取family和qualifier,拼装查询条件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filter = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(entry.getValue())); scan.setFilter(filter); } } } scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } } catch (Exception e) { e.printStackTrace(); logger.error("查询失败!"); throw new Exception(e); }finally { scanner.close(); } return objs; } /** * @Descripton: 分页查询数据 * @Author: Sorin * @param obj * @param startrowname * @param pageSize * @Date: 2018/4/25 */ public <T> List<T> queryScanPage(T obj, String startrowname, String pageSize, Map<String, String> param) throws Exception{ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Filter filter = new PageFilter(Integer.parseInt(pageSize)); FilterList filterList = new FilterList(); Scan scan = new Scan(Bytes.toBytes(startrowname)); // 从缓存中取family和qualifier,拼装查询条件 Map<String, List<Map<String, String>>> tableMaps = HconnectionFactory.TableMaps; List<Map<String, String>> lists = tableMaps.get(tableName); for (Map.Entry<String, String> entry : param.entrySet()){ for (Map<String, String> map : lists) { String family = map.get("family"); String qualifier = map.get("qualifier"); if(qualifier.equals(entry.getKey())){ Filter filterDetail = new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(entry.getKey()), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(entry.getValue())); filterList.addFilter(filterDetail); } } } filterList.addFilter(filter); scan.setFilter(filterList); scan.setReversed(true); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查询失败!"); }finally { scanner.close(); } return objs; } /** * 根据rowkey查询记录 * @param obj * @param rowkey "rowkey"开始字符 * @param <T> * @return */ public <T> List<T> queryScanRowkey(T obj, String rowkey){ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; try { Table table = getTable(tableName); Scan scan = new Scan(); scan.setRowPrefixFilter(Bytes.toBytes(rowkey)); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查询失败!"); }finally { scanner.close(); } return objs; } /** * 根据rowkey查询记录-分页 * @param obj * @param rowkey "rowkey"开始字符 * @param <T> * @return */ public <T> List<T> queryScanRowkeyPage(T obj, String rowkey){ List<T> objs = new ArrayList<T>(); String tableName = getORMTable(obj); if (StringUtils.isBlank(tableName)) { return null; } ResultScanner scanner = null; FilterList filterList = new FilterList(); Scan scan = new Scan(Bytes.toBytes(rowkey)); try { Table table = getTable(tableName); Filter filterDetail = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(rowkey.getBytes())); filterList.addFilter(filterDetail); scan.setFilter(filterList); scanner = table.getScanner(scan); for (Result result : scanner) { T beanClone = (T)BeanUtils.cloneBean(HBaseBeanUtil.resultToBeanNew(result, obj, tableName)); objs.add(beanClone); } }catch (Exception e){ e.printStackTrace(); logger.error("查询失败!"); }finally { scanner.close(); } return objs; } /** * @Descripton: 根据表名获取链接,避免每次操做hbase都获取链接 * @Author: Sorin * @param tableName * @Date: 2018/5/4 */ private Table getTable(String tableName){ Table table = null; try { if("bn_user".equals(tableName)){ table = HconnectionFactory.UserTable; }else if("bn_notice_user".equals(tableName)){ table = HconnectionFactory.NoticeUserTable; }else if("bn_notice".equals(tableName)){ table = HconnectionFactory.NoticeTable; }else if("bn_message".equals(tableName)){ table = HconnectionFactory.MessageTable; }else{ HconnectionFactory.connection.getTable(TableName.valueOf(tableName)); } } catch (IOException e) { e.printStackTrace(); } return table; } }
HbaseBeanUtil:oop
package com.muheda.notice.hbase; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.List; /** * @Author: Sorin * @Descriptions: * @Date: Created in 2018/3/22 */ public class HBaseBeanUtil { private static final Logger logger = LoggerFactory.getLogger(HBaseBeanUtil.class); /** * JavaBean转换为Put * @param <T> * @param obj * @return * @throws Exception */ public static <T> Put beanToPut(T obj) throws Exception { Put put = new Put(Bytes.toBytes(parseObjId(obj))); Class<?> clazz = obj.getClass(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { if (!field.isAnnotationPresent(HbaseColumn.class)) { continue; } field.setAccessible(true); HbaseColumn orm = field.getAnnotation(HbaseColumn.class); String family = orm.family(); String qualifier = orm.qualifier(); if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) { continue; } Object fieldObj = field.get(obj); if (fieldObj.getClass().isArray()) { logger.error("nonsupport"); } if ("rowkey".equalsIgnoreCase(qualifier) || "rowkey".equalsIgnoreCase(family)) { continue; } if (field.get(obj) != null || StringUtils.isNotBlank(field.get(obj).toString())) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(field.get(obj).toString())); } } return put; } /** * 获取Bean中的id,做为Rowkey * @param <T> * * @param obj * @return */ public static <T> String parseObjId(T obj) { Class<?> clazz = obj.getClass(); try { Field field = clazz.getDeclaredField("id"); field.setAccessible(true); Object object = field.get(obj); return object.toString(); } catch (NoSuchFieldException e) { logger.error("", e); } catch (SecurityException e) { logger.error("", e); } catch (IllegalArgumentException e) { logger.error("", e); } catch (IllegalAccessException e) { logger.error("", e); } return ""; } /** * HBase result 转换为 bean * @param <T> * @param result * @param obj * @return * @throws Exception */ public static <T> T resultToBean(Result result, T obj) throws Exception { if (result == null) { return null; } Class<?> clazz = obj.getClass(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { if (!field.isAnnotationPresent(HbaseColumn.class)) { continue; } HbaseColumn orm = field.getAnnotation(HbaseColumn.class); String family = orm.family(); String qualifier = orm.qualifier(); boolean timeStamp = orm.timestamp(); if (StringUtils.isBlank(family) || StringUtils.isBlank(qualifier)) { continue; } String fieldName = field.getName(); String value = ""; if ("rowkey".equalsIgnoreCase(family)) { value = new String(result.getRow()); } else { value = getResultValueByType(result, family, qualifier, timeStamp); } String firstLetter = fieldName.substring(0, 1).toUpperCase(); String setMethodName = "set" + firstLetter + fieldName.substring(1); Method setMethod = clazz.getMethod(setMethodName, new Class[] { field.getType() }); setMethod.invoke(obj, new Object[] { value }); } return obj; } /** * @param result * @param family * @param qualifier * @param timeStamp * @return */ private static String getResultValueByType(Result result, String family, String qualifier, boolean timeStamp) { if (!timeStamp) { return new String(result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier))); } List<Cell> cells = result.getColumnCells(Bytes.toBytes(family), Bytes.toBytes(qualifier)); if (cells.size() == 1) { Cell cell = cells.get(0); return cell.getTimestamp() + ""; } return ""; } }
至此,HBASE的操做咱们已封装好了,能够直接向下面这样使用:
@Component("demoDao") public class DemoDao { @Autowired private HBaseDaoUtil hBaseDaoUtil; /** * @Descripton: * @Author: Sorin * @param demo * @Date: 2018/3/22 */ public void save(Demo demo) { hBaseDaoUtil.save(demo); } /** * @Descripton: * @Author: Sorin * @param demo * @param id * @Date: 2018/3/22 */ public List<Demo> getById(Demo demo, String id) { return hBaseDaoUtil.get(demo, id); } }