本文为实现分布式任务调度系统中用到的一些关键技术点分享——Consistent Hashing算法原理和Java实现,以及效果测试。(代码实现见:https://github.com/yaohonv/pingpong/tree/master/consistenthashing)java
一致性Hashing在分布式系统中常常会被用到, 用于尽量地下降节点变更带来的数据迁移开销。Consistent Hashing算法在1997年就在论文Consistenthashing and random trees中被提出。node
先来简单理解下Hash是解决什么问题。假设一个分布式任务调度系统,执行任务的节点有n台机器,现有m个job在这n台机器上运行,这m个Job须要逐一映射到n个节点中一个,这时候能够选择一种简单的Hash算法来让m个Job能够均匀分布到n个节点中,好比 hash(Job)%n ,看上去很完美,但考虑以下两种情形:git
一、2两种情形能够看到,基本上全部的Job会被从新分配到跟节点变更前不一样的节点上,意味着须要迁移几乎全部正在运行的Job,想一想这样会给系统带来多大的复杂性和性能损耗。github
另外还有一种状况,假设节点的硬件处理性能不彻底一致,想让性能高的节点多被分配一些Job,这时候上述简单的Hash映射算法更是很难作到。算法
如何解决这种节点变更带来的大量数据迁移和数据不均匀分配问题呢?一致性哈希算法就很巧妙的解决了这些问题。缓存
Consistent Hashing是一种Hashing算法,典型的特征是:在减小或者添加节点时,能够尽量地保证已经存在Key映射关系不变,尽量地减小Key的迁移。数据结构
给定值空间2^32,[0,2^32]是全部hash值的取值空间,形象地描述为以下一个环(ring):app
2. 节点向值空间映射dom
将节点Node向这个值空间映射,取Node的Hash值,选取一个能够固定标识一个Node的属性值进行Hashing,假设以字符串形式输入,算法以下:分布式
能够取Node标识的md5值,而后截取其中32位做为映射值。md5取值以下:
private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); }
由于映射值只须要32位便可,因此能够利用如下方式计算最终值(number取0便可):
private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL; }
把n个节点Node经过以上方式取得hash值,映射到环形值空间以下:
算法中,将以有序Map的形式在内存中缓存每一个节点的Hash值对应的物理节点信息。缓存于这个内存变量中:private final TreeMap<Long, String> virtualNodes 。
3. 数据向值空间映射
数据Job取hash的方式跟节点Node的方式如出一辙,可使用上述md5->hash的方式一样将全部Job取得Hash映射到这个环中。
4. 数据和节点映射
当节点和数据都被映射到这个环上后,能够设定一个规则把哪些数据hash值放在哪些节点Node Hash值上了,规则就是,沿着顺时针方向,数据hash值向后找到第一个Node Hash值即认为该数据hash值对应的数据映射到该Node上。至此,这一个从数据到节点的映射关系就肯定了。
顺时针找下一个Node Hash值算法以下:
public String select(Trigger trigger) { String key = trigger.toString(); byte[] digest = md5(key); String node = sekectForKey(hash(digest, 0)); return node; } private String sekectForKey(long hash) { String node; Long key = hash; if (!virtualNodes.containsKey(key)) { SortedMap<Long, String> tailMap = virtualNodes.tailMap(key); if (tailMap.isEmpty()) { key = virtualNodes.firstKey(); } else { key = tailMap.firstKey(); } } node = virtualNodes.get(key); return node; }
Trigger是对Job一次触发任务的抽象,这里可忽略关注,重写了toString方法返回一个标记一个Job的惟一标志,计算Hash值,从节点Hash值中按规则寻找。 虚拟节点后续介绍。
接下来就能够见识下一致性哈希基于这样的数据结构是如何发挥前文提到的优点的。
1. 节点减小时,看须要迁移的节点状况
假设Node_1宕掉了,图中数据对象只有Job_1会被从新映射到Node_k,而其余Job_x扔保持原有映射关系不变。
2. 节点新增时
假设新增Node_i,图中数据对象只有Job_k会被从新映射到Node_i上,其余Job_x一样保持原有映射关系不变。
上述算法过程,会想到两个问题,第一,数据对象会不会分布不均匀,特别是新增节点或者减小节点时;第二,前文提到的若是想让部分节点多映射到一些数据对象,如何处理。虚拟节点这是解决这个问题。
将一个物理节点虚拟出必定数量的虚拟节点,分散到这个值空间上,须要尽量地随机分散开。
假设有4个物理节点Node,环上的每一个色块表明一个虚拟节点涵盖的hash值区域,每种颜色表明一个物理节点。当物理节点较少时,虚拟节点数须要更高来确保更好的一致性表现。经测试,在物理节点为个位数时,虚拟节点可设置为160个,此时可带来较好的表现(后文会给出测试结果,160*n个总节点数状况下,若是发生一个节点变更,映射关系变化率基本为1/n,达到预期)。
具体作算法实现时,已知物理节点,虚拟节点数设置为160,可将这160*n的节点计算出Hash值,以Hash值为key,以物理节点标识为value,以有序Map的形式在内存中缓存,做为后续计算数据对象对应的物理节点时的查询数据。代码以下,virtualNodes中缓存着全部虚拟节点Hash值对应的物理节点信息。
public ConsistentHash(List<String> nodes) { this.virtualNodes = new TreeMap<>(); this.identityHashCode = identityHashCode(nodes); this.replicaNumber = 160; for (String node : nodes) { for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(node.toString() + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualNodes.put(m, node); } } } }
以上详细介绍了一致性哈希(Consistent Hashing)的算法原理和实现过程,接下来给出一个测试结果:
以10个物理节点,160个虚拟节点,1000个数据对象作测试,10个物理节点时,这1000个数据对象映射结果以下:
减小一个节点前,path_7节点数据对象个数:113
减小一个节点前,path_0节点数据对象个数:84
减小一个节点前,path_6节点数据对象个数:97
减小一个节点前,path_8节点数据对象个数:122
减小一个节点前,path_3节点数据对象个数:102
减小一个节点前,path_2节点数据对象个数:99
减小一个节点前,path_4节点数据对象个数:98
减小一个节点前,path_9节点数据对象个数:102
减小一个节点前,path_1节点数据对象个数:99
减小一个节点前,path_5节点数据对象个数:84
减小一个物理节点path_9,此时9个物理节点,原有1000个数据对象映射状况以下:
减小一个节点后,path_7节点数据对象个数:132
减小一个节点后,path_6节点数据对象个数:107
减小一个节点后,path_0节点数据对象个数:117
减小一个节点后,path_8节点数据对象个数:134
减小一个节点后,path_3节点数据对象个数:104
减小一个节点后,path_4节点数据对象个数:104
减小一个节点后,path_2节点数据对象个数:115
减小一个节点后,path_5节点数据对象个数:89
减小一个节点后,path_1节点数据对象个数:98
先从数量上对比下每一个物理节点上数据对象的个数变化:
减小一个节点后,path_7节点数据对象个数从113变为132
减小一个节点后,path_6节点数据对象个数从97变为107
减小一个节点后,path_0节点数据对象个数从84变为117
减小一个节点后,path_8节点数据对象个数从122变为134
减小一个节点后,path_3节点数据对象个数从102变为104
减小一个节点后,path_4节点数据对象个数从98变为104
减小一个节点后,path_2节点数据对象个数从99变为115
减小一个节点后,path_5节点数据对象个数从84变为89
减小一个节点后,path_1节点数据对象个数从99变为98
能够看到基本是均匀变化,如今逐个对比每一个数据对象先后映射到的物理节点,发生变化的数据对象占比状况,统计以下:
数据对象迁移比率:0.9%
该结果基本体现出一致性哈希所能带来的最佳表现,尽量地减小节点变更带来的数据迁移。
最后附上完整的算法代码,供你们参照。代码中数据对象是以Trigger抽象,能够调整成特定场景的,便可运行测试。 https://github.com/yaohonv/pingpong/tree/master/consistenthashing
package com.cronx.core.common; import com.cronx.core.entity.Trigger; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Collections; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; /** * Created by echov on 2018/1/9. */ public class ConsistentHash { private final TreeMap<Long, String> virtualNodes; private final int replicaNumber; private final int identityHashCode; private static ConsistentHash consistentHash; public ConsistentHash(List<String> nodes) { this.virtualNodes = new TreeMap<>(); this.identityHashCode = identityHashCode(nodes); this.replicaNumber = 160; for (String node : nodes) { for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(node.toString() + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualNodes.put(m, node); } } } } private static int identityHashCode(List<String> nodes){ Collections.sort(nodes); StringBuilder sb = new StringBuilder(); for (String s: nodes ) { sb.append(s); } return sb.toString().hashCode(); } public static String select(Trigger trigger, List<String> nodes) { int _identityHashCode = identityHashCode(nodes); if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) { synchronized (ConsistentHash.class) { if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) { consistentHash = new ConsistentHash(nodes); } } } return consistentHash.select(trigger); } public String select(Trigger trigger) { String key = trigger.toString(); byte[] digest = md5(key); String node = sekectForKey(hash(digest, 0)); return node; } private String sekectForKey(long hash) { String node; Long key = hash; if (!virtualNodes.containsKey(key)) { SortedMap<Long, String> tailMap = virtualNodes.tailMap(key); if (tailMap.isEmpty()) { key = virtualNodes.firstKey(); } else { key = tailMap.firstKey(); } } node = virtualNodes.get(key); return node; } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[0 + number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } }