【工程化】一致性hash

介绍

Consistent hashing,一致性hash最先是由David Karger等人在《Consistent Hashing and Random Trees:Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web》论文中提出的,为的是解决分布式网络中减小或消除热点问题的发生而提出的缓存协议。node

论文指出了一致性的4个特性:web

  1. Balance,平衡性是指哈希的结果可以尽量分布到全部的缓存中去,这样可使得全部的缓存空间都获得利用。The balance property is what is prized about standard hash functions: they distribute items among buckets in a balanced fasion。
  2. Monotonicity,单调性是指若是已经有一些item经过哈希分派到了相应的bucket中,又有新的bucket加入到系统中。哈希的结果应可以保证原有已分配的item能够被映射到原有的或者新的bucket中去,而不会被映射到旧的bucket集合中的其余bucket中。This property says that if items are initially assigned to a set of buckets V1 and then some new buckets are added to form V2, then an item may move from an old bucket to a new bucket, but not from one old bucket to another. This reflects one intuition about consistency: when the set of usable buckets changes, items should only move if necessary to preserve an even distribution.
  3. Spread,分散性是指在分布式环境中,终端有可能看不到全部的buckets,而是只能看到其中的一部分。当终端但愿经过哈希过程将内容映射到bucket上时,因为不一样终端所见的buckets范围有可能不一样,从而致使哈希的结果不一致,最终的结果是相同的内容被不一样的终端映射到不一样的bucket中。这种状况显然是应该避免的,由于它致使相同内容被存储到不一样bucket去,下降了系统存储的效率。分散性的定义就是上述状况发生的严重程度。好的哈希算法应可以尽可能避免不一致的状况发生,也就是尽可能下降分散性。 The idea behind spread is that there are V people, each of whom can see at least a constant fraction ( 1/t ) of the buckets that are visible to anyone. Each person tries to assign an item i to a bucket using a consistent hash function. The property says that across the entire group, there are at most i different opinions about which bucket should contain the item. Clearly, a good consistent hash function should have low spread over all item.
  4. Load,负载问题其实是从另外一个角度看待分散性问题。既然不一样的终端可能将相同的内容映射到不一样的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不一样的用户映射为不一样的内容。与分散性同样,这种状况也是应当避免的,所以好的哈希算法应可以尽可能下降缓冲的负荷。The load property is similar to spread. The same V people are back, but this time we consider a particular bucket b instead of an item. The property says that there are at most b distinct items that at least one person thinks belongs in the bucket. A good consistent hash function should also have low load.

常见的使用

一些已知的场景如:算法

  • memcached的分布式缓存访问
  • 用做负载均衡,如dubbo的ConsistentHashLoadBalance
  • 分布式哈希表(DHT,Distributed Hash Table)用来在一群节点中实现(key, value)的关系映射。在相似Cassandra等分布式系统中使用了DHT

一致性hash更多的应用在负载均衡。缓存

问题提出

通常在分布式系统设计中,若是咱们将某些用户请求、或者某些城市数据,访问指定的某台机器,通常的算法是基于关键字取hash值而后%机器数(hash(key)% N)。
假设咱们有3台机器A、B 、C,后来新加了一台机器D,其索引与机器映射以下:服务器

clipboard.png

针对不一样的key,其hashcode为1-10取模运算:网络

clipboard.png

通过上面的表格能够看到,当添加了一台新机器D的时候,致使大部分key产生了miss,命中率按照上面表格计算只为20%。虽然是一个简单的列子,但足以说明该算法在机器伸缩时候,会形成大量的数据没法被正确被命中。若是这是缓存架构设计,那么缓存miss后会把请求都落在DB上,形成DB压力。若是这是个分布式业务调用,原来访问机器可能作了配置数据、或缓存了上下文等,miss就意味着本次调用失败。数据结构

就上面的case,这个算法自己违背了“单调性” 设计特性。架构

单调性是指若是已经有一些item经过哈希分派到了相应的bucket中,又有新的bucket加入到系统中。哈希的结果应可以保证原有已分配的item能够被映射到原有的或者新的bucket中去,而不会被映射到旧的bucket集合中的其余bucket中

Consistent Hashing 算法

先构造一个长度为2^32的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 2^32-1])将缓存服务器节点放置在这个Hash环上,而后根据须要缓存的数据的Key值计算获得其Hash值(其分布也为[0, 2^32-1]),而后在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

clipboard.png

以上经过特定的Hash函数f=h(x),
(1)计算出Node节点,而后散列到一致性Hash环上:负载均衡

Node节点的hash值:
h(Node1)=K1
h(Node2)=K2
h(Node3)=K3dom

(2)计算出对象的hash值,而后以顺时针的方向计算,将全部对象存储到离本身最近的机器中。

h(object1)=key1
h(object2)=key2
h(object3)=key3
h(object4)=key4

当发生机器节点Node的添加和删除时:

(1)机器节点Node增长,新增一个节点Node4
计算出h(Node4)=K4,将其映射到一致性Hash环上以下:

clipboard.png

经过按顺时针迁移的规则,那么object3被迁移到了NODE4中,其它对象还保持原有的存储位置。

(2)机器节点Node删除,删除节点Node2

clipboard.png

经过顺时针迁移的规则,那么object2被迁移到Node3中,其余对象还保持原有的存储位置。

经过对节点的添加和删除的分析,一致性哈希算法在保持了单调性的同时,仍是数据的迁移达到了最小,这样的算法对分布式集群来讲是很是合适的,避免了大量数据迁移,减少了服务器的的压力。

算法实现

根据以前的算法的描述,使得Node节点基于其hash值大小,按顺序分布在[0-2^32-1]这个环上,而后根据object的hash值,查找
a、hash值相等,返回这个节点Node。
b、大于它hash值的第一个,返回这个节点Node。

1)选择合适的数据结构:

论文中提到:

clipboard.png

官方建议实现可使用平衡二叉树。如AVL、红黑树

2)选择合适的Hash函数,足够散列。

先看下Java String的hashcode:

public static void main(String[] args) {
    System.out.println("192.168.0.1:1111".hashCode());
    System.out.println("192.168.0.2:1111".hashCode());
    System.out.println("192.168.0.3:1111".hashCode());
    System.out.println("192.168.0.4:1111".hashCode());
 }
散列值:1874499238
1903128389
1931757540
1960386691

2^32-1 = 4294967296
若是咱们把上面4台机器Node分布到[0-2^32-1]这个环上,取值的范围只是一个很小的范围区间,这样90%的请求将会落在Node1这个节点,这样的分布是在太糟糕了。

clipboard.png

所以咱们要寻找一种冲突较小,且分布足够散列。一些hash函数有CRC32_HASH、FNV1_32_HASH、KETAMA_HASH、MYSQL_HASH,如下是一张各hash算法的比较(未验证,来自网络)

clipboard.png

简单判断是FNV1_32_HASH不错,KETAMA_HASH是MemCache推荐的一致性Hash算法。

代码实现

public class ConsistentHashingWithoutVirtualNode {

    /**
     * key表示服务器的hash值,value表示服务器的名称
     */
    private static SortedMap<Integer, String> sortedMap =
            new TreeMap<Integer, String>();

    /**
     * 使用FNV1_32_HASH算法计算服务器的Hash值,这里不使用重写hashCode的方法,最终效果没区别
     */
    private static int getFNV1_32_HASHHash(String str) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < str.length(); i++)
            hash = (hash ^ str.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;

        // 若是算出来的值为负数则取其绝对值
        if (hash < 0)
            hash = Math.abs(hash);
        return hash;
    }

    /**
     * 待添加入Hash环的服务器列表
     */
    private static String[] servers = {"192.168.0.1:111", "192.168.0.2:111", "192.168.0.3:111",
            "192.168.0.3:111", "192.168.0.4:111"};

    /**
     * 程序初始化,将全部的服务器放入sortedMap中
     */
    static {
        for (int i = 0; i < servers.length; i++) {
            int hash = getFNV1_32_HASHHash(servers[i]);
            System.out.println("[" + servers[i] + "]加入集合中, 其Hash值为" + hash);
            sortedMap.put(hash, servers[i]);
        }
        System.out.println();
    }

    /**
     * 获得应当路由到的结点
     */
    private static String getServer(String node) {
        // 获得带路由的结点的Hash值
        int hash = getFNV1_32_HASHHash(node);
        if (!sortedMap.containsKey(hash)) {
            // 获得大于该Hash值的全部Map
            SortedMap<Integer, String> tailMap =
                    sortedMap.tailMap(hash);
            if (!tailMap.isEmpty()) {
                // 第一个Key就是顺时针过去离node最近的那个结点
                return sortedMap.get(tailMap.firstKey());
            } else {
                return sortedMap.get(sortedMap.firstKey());
            }
        }
        return sortedMap.get(hash);
    }


    public static void main(String[] args) {
        String[] nodes = {"hello1", "hello2", "hello3"};
        for (int i = 0; i < nodes.length; i++)
            System.out.println("[" + nodes[i] + "]的hash值为" +
                    getFNV1_32_HASHHash(nodes[i]) + ", 被路由到结点[" + getServer(nodes[i]) + "]");
    }

算法的缺陷

一致性hashing虽然知足了单调性和负载均衡的特性以及通常hash算法的分散性。可是不知足“平衡性”。

Balance,平衡性是指哈希的结果可以尽量分布到全部的缓存中去,这样可使得全部的缓存空间都获得利用。

该算法中,Hash函数是不能保证平衡的,如上面分析的,当集群中发生节点添加时,该节点会承担一部分数据访问,当集群中发生节点删除时,被删除的节点P负责的数据就会落在下一个节点Q上,这样势必会加剧Q节点的负担。这就是发生了不平衡。

解决

引入虚拟节点。Virtual Node,是实际节点的复制品Replica。
好比集群中如今有2个节点Node一、Node3,就是那个删除Node2的图,

clipboard.png

每一个节点引入2个副本,Node1-一、Node1-2,Node3-一、Node3-2

clipboard.png

如此引入虚拟节点,使得对象的分布比较均衡。那么对于节点,物理节点和虚拟节点之间的映射以下:

clipboard.png

到此,该算法的改进已经完成,不过要用在工程中,仍有几个问题需解决:

  1. 一个真实节点应该映射成多少个虚拟节点
  2. 根据虚拟节点如何找到对应的真实节点

解决方案

1)理论上物理节点越少,须要的虚拟节点就越多。看下ketama算法的描述中:

clipboard.png

ketama默认是节点为160个

2)“虚拟节点”的hash计算能够采用对应节点的IP地址加带数字后缀的方式。如“192.168.0.0:111”,2个副本为“192.168.0.0:111-VN1”、“192.168.0.0:111-VN2”。
tips:在初始化虚拟节点到一致性hash环上的时候,能够直接h(192.168.0.0:111-VN2)->"192.168.0.0:111" 真实节点。

Ketama算法实现

如下的是net.spy.memcached.KetamaNodeLocator.Java的setKetamaNodes()方法的实现:

protected void setKetamaNodes(List<MemcachedNode> nodes) {
    TreeMap<Long, MemcachedNode> newNodeMap =
            new TreeMap<Long, MemcachedNode>();
    int numReps = config.getNodeRepetitions();
    int nodeCount = nodes.size();
    int totalWeight = 0;

    if (isWeightedKetama) {
        for (MemcachedNode node : nodes) {
            totalWeight += weights.get(node.getSocketAddress());
        }
    }

    for (MemcachedNode node : nodes) {
      if (isWeightedKetama) {

          int thisWeight = weights.get(node.getSocketAddress());
          float percent = (float)thisWeight / (float)totalWeight;
          int pointerPerServer = (int)((Math.floor((float)(percent * (float)config.getNodeRepetitions() / 4 * (float)nodeCount + 0.0000000001))) * 4);
          for (int i = 0; i < pointerPerServer / 4; i++) {
              for(long position : ketamaNodePositionsAtIteration(node, i)) {
                  newNodeMap.put(position, node);
                  getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
              }
          }
      } else {
          // Ketama does some special work with md5 where it reuses chunks.
          // Check to be backwards compatible, the hash algorithm does not
          // matter for Ketama, just the placement should always be done using
          // MD5
          if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
              for (int i = 0; i < numReps / 4; i++) {
                  for(long position : ketamaNodePositionsAtIteration(node, i)) {
                    newNodeMap.put(position, node);
                    getLogger().debug("Adding node %s in position %d", node, position);
                  }
              }
          } else {
              for (int i = 0; i < numReps; i++) {
                  newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
              }
          }
      }
    }
    assert newNodeMap.size() == numReps * nodes.size();
    ketamaNodes = newNodeMap;
  }

详细的算法实现和分析见这篇文章

相关文章
相关标签/搜索