最近有一个项目,其中某个功能单表数据在可预估的将来达到了亿级,初步估算在90亿左右。与同事详细讨论后,决定采用一致性Hash算法来完成数据库的自动扩容和数据迁移。整个程序细节由我同事完成,我只是将其理解并成文,供有相同问题的同行参考。html
参看此文的兄弟,默认各位已经熟悉一致性hash算法了。此文仅仅阐述代码细节,实现语言为Java
。node
项目正式上线初期,数据量不会直接爆发式增加到90亿,须要时间上的积累(逐步作实验),最终可能达到90亿数据,甚至超过90亿数据。git
按照咱们实际了解状况,oracle存储数据量达到1千万的时候,性能擅可。而Oracle官方的说法,如单表存储1g有分区(大体500万数据),查询效率很是高。而试验表中仅四个字段,每条数据数据量较小。因此咱们最终决定以1000万为节点,水平拆表。当表数据达到1千万时,即增长下一波表。进行数据自动迁移。github
按照90亿的总量,1000万数据一个表的划分,最终大体会产生900个左右的表。因此咱们最终使用了4个数据库。1个存储其余业务模块的表,3个存储此大数据表。每一个数据库大体有300张表。性能上和数量上均可达到咱们的要求。算法
试验信息表(EXPERIMENT_MESSAGE),挂接车型和试验的关系。试验数据表(EXPERIMENT_DATA),存储试验数据数据库
试验信息表:数据结构
字段 | 含义 |
---|---|
ID | 主键,采用UUID生成 |
EXPERIMENT_ID | 试验表中的ID |
CAR_ID | 车型表中的ID |
... | 其他数十个字段省略 |
试验数据表:oracle
字段 | 含义 |
---|---|
ID | 主键,采用UUID生成 |
EXPERIMENT_MESSAGE_ID | 对应的实验信息id |
X_VALUE | 试验数据X值 |
Y_VALUE | 试验数据Y值 |
咱们采用做一致性hash的key,就是试验数据表中的EXPERIMENT_MESSAGE_ID
字段。也就是说,每一个试验数据表,不存则以,存则一次性大体有6000条数据。取同理。性能
一致性Hash算法的hash部分,采用了著名的ketama算法。在此,咱们很少讨论ketama算法的细节,若各位有兴趣,请查阅ketama算法大数据
public long hash(String key) { if (md5 == null) { try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("no md5 algorythm found"); } } md5.reset(); md5.update(key.getBytes()); byte[] bKey = md5.digest(); long res = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF); return res & 0xffffffffL; }
有了Hash的算法,接下来就要构造Hash环了。Hash环采用的SortedMap数据结构实现。
private final SortedMap<Long, T> circle = new TreeMap<Long, T>();
其中添加节点和移除节点部分,须要根据hash算法获得节点在环上的位置,具体代码以下:
/** * 添加虚拟节点 * numberOfReplicas为虚拟节点的数量,初始化hash环的时候传入,咱们使用300个虚拟节点 * @param node */ public void add(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.put(hashFunction.hash(node.toString() + i), node); } } /** * 移除节点 * @param node */ public void remove(T node) { for (int i = 0; i < numberOfReplicas; i++) { circle.remove(hashFunction.hash(node.toString() + i)); } }
而hash环中获得节点部分比较特殊,根据一致性hash算法的介绍,获得hash环中的节点,其实是计算出的hash值顺时针找到的第一个节点。
/** * 得到一个最近的顺时针节点 * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点 * @return */ public T get(Object key) { if (circle.isEmpty()) { return null; } long hash = hashFunction.hash((String) key); if (!circle.containsKey(hash)) { //返回此映射的部分视图,其键大于等于 hash SortedMap<Long, T> tailMap = circle.tailMap(hash); hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } return circle.get(hash); }
上面完成了一致性hash算法的实现,包含了hash算法和hash环的实现。接下来就要处理具体业务中,如何使用这个hash环和算法了。
咱们业务中,主要操做这张表的数据,也就是增删查。而后咱们数据库拆分红了3个,因此须要增删查的操做基本一致,都是先经过一致性hash获得库,再经过一致性hash获得表。
获取数据库名的操做以下,获取到数据库后,根据数据库名到对应的链接池中获取链接。
/** * 根据试验信息id获取其所在库名 * DatabaseType为咱们数据的枚举 * @return 数据库的名称 **/ private String getDataBase(String experimentMessageId) { //获取数据源 DatabaseType[] databasetype = DatabaseType.values(); List<String> dataBaselist = new ArrayList<>(); Map<String, DatabaseType> map = new HashMap<>(); for (DatabaseType d:databasetype) { if (!d.equals(DatabaseType.KC)) { dataBaselist.add(d.toString()); map.put(d.toString(), d); } } //获取数据源hash ConsistentHash<String> dataBaseCon = getConsistentHash(dataBaselist); //获取id所在数据源 String dataBase = dataBaseCon.get(experimentMessageId); return dataBase; }
获取表名的操做以下,获取到数据库后,在对应的数据库中找到须要的表,再从该表中查询数据。
/** * 根据试验信息id获取其试验数据所在表 * @return **/ public String getTableName(String experimentMessageId) { String dataBase = getDataBase(experimentMessageId); //查询全部试验数据表 List<String> tables = experimentDataEODao.queryTbaleNames(dataBase, tableName); ConsistentHash<String> consistentHash = getConsistentHash(tables); String tableName = consistentHash.get(experimentMessageId); return tableName; }
剩下的增删改操做和日常一致,在此很少赘述。
一致性hash势必涉及到数据迁移问题,咱们采起的数据迁移方式为定时任务,针对每一个数据库在天天夜里全量扫描一次。检查是否有数据量超过1000万的表,若存在这样的表,就把现有的表数量double。
数据迁移只会在同库之间迁移,不会涉及跨数据库的状况。
此方案为初步方案,后续会改进的更加智能,根据表的数量,增长不一样数量的表。而不是简单的把表数量翻倍。
表建立后,将须要迁移的表数据逐个迁移。
在链接到数据源后,咱们作了以下事情进行数据迁移
1.获取库中全部的表
List<String> tables = getTables(connection, p, d.toString());
2.遍历表,检查表中数据是否超过边界线(咱们为1000万)
for (int i = 0; i < tables.size(); i++) { //查询表内数据量 int num = countByTableName(connection, p, tables.get(i)); //finalNum为边界值,此处为1000万 if (num > finalNum) { …… } …… }
3.根据全部的表计算现有的虚拟节点
ConsistentHash<String> consistentHashOld = getConsistentHash(tables);
4.把表加倍
List<String> tablesNew = deepCopy(tables); //注意必定要采用深复制 int tableSize = tablesNew.size(); for (int y = 0; y < tableSize; y++) { String tableNameNew = tableName + (tablesNew.size() + 1); //建立表 createTable(connection, p, d.toString(), tableNameNew); tablesNew.add(tableNameNew); tableDelete.add(tableNameNew); }
5.计算加倍后的虚拟节点
ConsistentHash<String> consistentHashNew = getConsistentHash(tablesNew);
6.数据迁移
for (int z = 0; z < tableSize; z++) { String tableNameOld = tablesNew.get(z); //查询试验信息id不重复的试验数据信息 List<String> disData = selectExperimentIdDis(connection, p, tableNameOld); List<String> deleteList = new LinkedList<>(); for (String experimentId : disData) { //若是数据hash计算 原所在表与新建表以后不一致,执行转移 if (!consistentHashNew.get(experimentId).equals(consistentHashOld.get(experimentId))) { //新增到新表数据 insertHash(connection, p, experimentId, consistentHashOld.get(experimentId), consistentHashNew.get(experimentId)); //删除数据集合 deleteList.add(experimentId); //删除旧表数据 final int defaultDelNum = 1000; if (deleteList.size() == defaultDelNum) { deleteInbatch(connection, p, deleteList, tableNameOld); deleteList.clear(); } } } //删除旧表数据 if (deleteList.size() > 0) { deleteInbatch(connection, p, deleteList, tableNameOld); } }
以上为咱们所作的一致性hash实践,其中还存在不少问题,好比迁移过程单线程致使迁移较慢、自动扩容机制不智能、迁移过程当中数据访问不稳定等状况。
咱们将会在后续的开发中逐步进行完善改进。
以上就是咱们针对一致性hash在oracle分表中的实践