动手实现一致性哈希算法,并搭建环境测试其负载均衡特性.

一. 简述一致性哈希算法

  • 这里不详细介绍一致性哈希算法的起源了, 网上能方便地搜到许多介绍一致性哈希算法的好文章. 本文主要想动手实现一致性哈希算法, 并搭建一个环境进行实战测试.
  • 在开始以前先整理一下算法的思路:
  • 一致性哈希算法经过把每台服务器的哈希值打在哈希环上, 把哈希环分红不一样的段, 而后对到来的请求计算哈希值从而得知该请求所归属的服务器. 这个办法解决了传统服务器增减机器时须要从新计算哈希的麻烦.
  • 但若是服务器的数量较少, 可能致使计算出的哈希值相差较小, 在哈希环上分布不均匀, 致使某台服务器过载. 为了解决负载均衡问题, 咱们引入虚拟节点技术, 为每台服务器分配必定数量的节点, 经过节点的哈希值在哈希环上进行划分. 这样一来, 咱们就能够根据机器的性能为其分配节点, 性能好就多分配一点, 差就少一点, 从而达到负载均衡.

二. 实现一致性哈希算法.

  • 奠基了总体思路后咱们开始考虑实现的细节
  1. 哈希算法的选择
  • 选择能散列出32位整数的FNV算法, 因为该哈希函数可能产生负数, 须要做取绝对值处理.
  1. 请求节点在哈希环上寻找对应服务器的策略
  • 策略为: 新节点寻找最近比且它大的节点, 好比说如今已经有环[0, 5, 7, 10], 来了个哈希值为6的节点, 那么它应该由哈希值为7对应的服务器处理. 若是请求节点所计算的哈希值大于环上的全部节点, 那么就取第一个节点. 好比来了个11, 将分配到0所对应的节点.
  1. 哈希环的组织结构
  • 开始的时候想过用顺序存储的结构存放, 可是在一致性哈希中, 最频繁的操做是在集合中查找最近且比目标大的数. 若是用顺序存储结构的话, 时间复杂度是收敛于O(N)的, 而树形结构则为更优的O(logN).
  • 但凡事有两面, 采用树形结构存储的代价是数据初始化的效率较低, 并且运行期间若是有节点插入删除的话效率也比较低. 可是在现实中, 服务器在一开始注册后基本上就不怎么变了, 期间增减机器, 宕机, 机器修复等事件的频率相比起节点的查询简直是微不足道. 因此本案例决定使用使用树形结构存储.
  • 贴合上述要求, 而且提供有序存储的首先想到的是红黑树, 并且Java中提供了红黑树的实现TreeMap.
  1. 虚拟节点与真实节点的映射关系
  • 如何肯定一个虚拟节点对应的真实节点也是个问题. 理论上应该维护一张表记录真实节点与虚拟节点的映射关系. 本引入案例为了演示采用简单的字符串处理. 比方说服务器192.168.0.1:8888分配了1000个虚拟节点, 那么它的虚拟节点名称从192.168.0.1:8888@1一直到192.168.0.1:8888@1000. 经过这样的处理, 咱们在经过虚拟节点找真实节点时只须要裁剪字符串便可.html

  • 计划定制好后, 下面开始怼代码前端

public class ConsistentHashTest {
    /** * 服务器列表,一共有3台服务器提供服务, 将根据性能分配虚拟节点 */
    public static String[] servers = {
            "192.168.0.1#100", //服务器1: 性能指数100, 将得到1000个虚拟节点
            "192.168.0.2#100", //服务器2: 性能指数100, 将得到1000个虚拟节点
            "192.168.0.3#30"   //服务器3: 性能指数30, 将得到300个虚拟节点
    };
    /** * 真实服务器列表, 因为增长与删除的频率比遍历高, 用链表存储比较划算 */
    private static List<String> realNodes = new LinkedList<>();
    /** * 虚拟节点列表 */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    static{
        for(String s : servers){
            //把服务器加入真实服务器列表中
            realNodes.add(s);
            String[] strs = s.split("#");
            //服务器名称, 省略端口号
            String name = strs[0];
            //根据服务器性能给每台真实服务器分配虚拟节点, 并把虚拟节点放到虚拟节点列表中.
            int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
            for(int i = 1; i <= virtualNodeNum; i++){
                virtualNodes.put(FVNHash(name + "@" + i), name + "@" + i);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new RequestProcess()).start();
    }

    static class RequestProcess implements Runnable{
        @Override
        public void run() {
            String client = null;
            while(true){
                //模拟产生一个请求
                client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
                //计算请求的哈希值
                int hash = FVNHash(client);
                //判断请求将由哪台服务器处理
                System.out.println(client + " 的请求将由 " + getServer(client) + " 处理");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    private static String getServer(String client) {
        //计算客户端请求的哈希值
        int hash = FVNHash(client);
        //获得大于该哈希值的全部map集合
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        //找到比该值大的第一个虚拟节点, 若是没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        //经过该虚拟节点得到真实节点的名称
        String virtualNodeName = virtualNodes.get(targetKey);
        String realNodeName = virtualNodeName.split("@")[0];
        return realNodeName;
    }

    public static int getN(){
        return (int)(Math.random() * 128);
    }

    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0 ? Math.abs(hash) : hash;
    }
}

/* 运行结果片断 55.1.13.47:6240 的请求将由 192.168.0.1 处理 5.49.56.126:1105 的请求将由 192.168.0.1 处理 90.41.8.88:6884 的请求将由 192.168.0.2 处理 26.107.104.81:2989 的请求将由 192.168.0.2 处理 114.66.6.56:8233 的请求将由 192.168.0.1 处理 123.74.52.94:5523 的请求将由 192.168.0.1 处理 104.59.60.2:7502 的请求将由 192.168.0.2 处理 4.94.30.79:1299 的请求将由 192.168.0.1 处理 10.44.37.73:9332 的请求将由 192.168.0.2 处理 115.93.93.82:6333 的请求将由 192.168.0.2 处理 15.24.97.66:9177 的请求将由 192.168.0.2 处理 100.39.98.10:1023 的请求将由 192.168.0.2 处理 61.118.87.26:5108 的请求将由 192.168.0.2 处理 17.79.104.35:3901 的请求将由 192.168.0.1 处理 95.36.5.25:8020 的请求将由 192.168.0.2 处理 126.74.56.71:7792 的请求将由 192.168.0.2 处理 14.63.56.45:8275 的请求将由 192.168.0.1 处理 58.53.44.71:2089 的请求将由 192.168.0.3 处理 80.64.57.43:6144 的请求将由 192.168.0.2 处理 46.65.4.18:7649 的请求将由 192.168.0.2 处理 57.35.27.62:9607 的请求将由 192.168.0.2 处理 81.114.72.3:3444 的请求将由 192.168.0.1 处理 38.18.61.26:6295 的请求将由 192.168.0.2 处理 71.75.18.82:9686 的请求将由 192.168.0.2 处理 26.11.98.111:3781 的请求将由 192.168.0.1 处理 62.86.23.37:8570 的请求将由 192.168.0.3 处理 */
复制代码

 

  • 通过上面的测试咱们能够看到性能较好的服务器1和服务器2分担了大部分的请求, 只有少部分请求落到了性能较差的服务器3上, 已经初步实现了负载均衡.
  • 下面咱们将结合zookeeper, 搭建一个更加逼真的服务器集群, 看看在部分服务器上线下线的过程当中, 一致性哈希算法是否仍可以实现负载均衡.

 

三. 结合zookeeper搭建环境

环境介绍

  • 首先会经过启动多台虚拟机模拟服务器集群, 各台服务器都提供一个相同的接口供消费者消费.
  • 同时会有一个消费者线程不断地向服务器集群发起请求, 这些请求会通过一致性哈希算法均衡负载到各个服务器.
  • 为了可以模拟上述场景, 咱们必须在客户端维护一个服务器列表, 使得客户端可以经过一致性哈希算法选择服务器发送. (现实中可能会把一致性哈希算法实如今前端服务器, 客户先访问前端服务器, 再路由到后端服务器集群).
  • 可是咱们的重点是模拟服务器的宕机和上线, 看看一致性哈希算法是否仍能实现负载均衡. 因此客户端必须可以感知服务器端的变化并动态地调整它的服务器列表.
  • 为了完成这项工做, 咱们引入zookeeper, zookeeper的数据一致性算法保证数据实时, 准确, 客户端可以经过zookeeper得知实时的服务器状况.
  • 具体操做是这样的: 服务器集群先以临时节点的方式链接到zookeeper, 并在zookeeper上注册本身的接口服务(注册节点). 客户端链接上zookeeper后, 把已注册的节点(服务器)添加到本身的服务器列表中.
  • 若是有服务器宕机的话, 因为当初注册的是瞬时节点的缘由, 该台服务器节点会从zookeeper中注销. 客户端监听到服务器节点有变时, 也会动态调整本身的服务器列表, 把当宕机的服务器从服务器列表中删除, 所以不会再向该服务器发送请求, 负载均衡的任务将交到剩余的机器身上.
  • 当有服务器重新链接上集群后, 客户端的服务器列表也会更新, 哈希环也将作出相应的变化以提供负载均衡.

具体操做:

I. 搭建zookeeper集群环境:

  1. 建立3个zookeeper服务, 构成集群. 在各自的data文件夹中添加一个myid文件, 各个id分别为1, 2, 3.
    在这里插入图片描述
  2. 从新复制一份配置文件, 在配置文件中配置各个zookeeper的端口号. 本案例中三台zookeeper分别在2181, 2182, 2183端口
    在这里插入图片描述
  3. 启动zookeeper集群

因为zookeeper不是本案例的重点, 细节暂不展开讲了.java

 

II. 建立服务器集群, 提供RPC远程调用服务

  1. 首先建立一个服务器项目(使用Maven), 添加zookeeper依赖
  2. 建立常量接口, 用于存储链接zookeeper 的信息
public interface Constant {
    //zookeeper集群的地址
    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
    //链接zookeeper的超时时间
    int ZK_TIME_OUT = 5000;
    //服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
    String ZK_REGISTRY = "/provider";
    //zookeeper集群中注册服务的url地址的瞬时节点
    String ZK_RMI = ZK_REGISTRY + "/rmi";
}
复制代码

  3.封装操做zookeeper和发布远程服务的接口供本身调用, 本案例中发布远程服务使用Java自身提供的rmi包完成, 若是没有了解过能够参考这篇node

public class ServiceProvider {

    private CountDownLatch latch = new CountDownLatch(1);

    /** * 链接zookeeper集群 */
    public ZooKeeper connectToZK(){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //若是链接上了就唤醒当前线程.
                    latch.countDown();
                }
            });
            latch.await();//还没链接上时当前线程等待
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }

    /** * 建立znode节点 * @param zk * @param url 节点中写入的数据 */
    public void createNode(ZooKeeper zk, String url){
        try{
            //要把写入的数据转化为字节数组
            byte[] data = url.getBytes();
            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** * 发布rmi服务 */
    public String publishService(Remote remote, String host, int port){
        String url = null;
        try{
            LocateRegistry.createRegistry(port);
            url = "rmi://" + host + ":" + port + "/rmiService";
            Naming.bind(url, remote);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return url;
    }

    /** * 发布rmi服务, 而且将服务的url注册到zookeeper集群中 */
    public void publish(Remote remote, String host, int port){
        //调用publishService, 获得服务的url地址
        String url = publishService(remote, host, port);
        if(null != url){
            ZooKeeper zk = connectToZK();//链接到zookeeper
            if(null != zk){
                createNode(zk, url);
            }
        }
    }
}
复制代码

  4. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上Hello, 并返回字符串.算法

//UserService
public interface UserService extends Remote {
    public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {

    public UserServiceImpl() throws RemoteException{
        super();
    }

    @Override
    public String helloRmi(String name) throws RemoteException {
        return "Hello " + name + "!";
    }
}
复制代码

  5. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连.数据库

public static void main(String[] args) throws RemoteException {
    //建立工具类对象
    ServiceProvider sp = new ServiceProvider();
    //建立远程服务对象
    UserService userService = new UserServiceImpl();
    //完成发布
    sp.publish(userService, "localhost", 9999);
}
复制代码

在这里插入图片描述

 

III. 编写客户端程序(运用一致性哈希算法实现负载均衡

  1. 封装客户端接口.
public class ServiceConsumer {
    /** * 提供远程服务的服务器列表, 只记录远程服务的url */
    private volatile List<String> urls = new LinkedList<>();
    /** * 远程服务对应的虚拟节点集合 */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    public ServiceConsumer(){
        ZooKeeper zk = connectToZK();//客户端链接到zookeeper
        if(null != zk){
            //链接上后关注zookeeper中的节点变化(服务器变化)
            watchNode(zk);
        }
    }

    private void watchNode(final ZooKeeper zk) {
        try{
            //观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //若是服务器节点有变化就从新获取
                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
                        System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
                        watchNode(zk);
                    }
                }
            });
            //将获取到的服务器节点数据保存到集合中, 也就是得到了远程服务的访问url地址
            List<String> dataList = new LinkedList<>();
            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
            for(String nodeStr : nodeList){
                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
                //放入服务器列表的url
                String url = new String(data);
                //为每一个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其余分配1000个.
                if(url.contains("9999")){
                    for(int i = 1; i <= 300; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
                    }
                }else{
                    for(int i = 1; i <= 1000; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
                    }
                }
                dataList.add(url);
            }
            urls = dataList;
            virtualNodes = newVirtualNodesList;
            dataList = null;//好让垃圾回收器尽快收集
            newVirtualNodesList = null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** * 根据url得到远程服务对象 */
    public <T> T lookUpService(String url){
        T remote = null;
        try{
            remote = (T)Naming.lookup(url);
        } catch (Exception e) {
            //若是该url链接不上, 颇有多是该服务器挂了, 这时使用服务器列表中的第一个服务器url从新获取远程对象.
            if(e instanceof ConnectException){
                if (urls.size() != 0){
                    url = urls.get(0);
                    return lookUpService(url);
                }
            }
        }
        return remote;
    }

    /** * 经过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象 */
    public <T extends Remote> T lookUp(){
        T service = null;
        //随机计算一个哈希值
        int hash = FVNHash(Math.random() * 10000 + "");
        //获得大于该哈希值的全部map集合
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        //找到比该值大的第一个虚拟节点, 若是没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        //经过该虚拟节点得到服务器url
        String virtualNodeName = virtualNodes.get(targetKey);
        String url = virtualNodeName.split("@")[0];
        //根据服务器url获取远程服务对象
        service = lookUpService(url);
        System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
        return service;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    public ZooKeeper connectToZK(){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //判断是否链接zk集群
                    latch.countDown();//唤醒处于等待状态的当前线程
                }
            });
            latch.await();//没有链接上的时候当前线程处于等待状态.
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }


    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0 ? Math.abs(hash) : hash;
    }
}
复制代码

  2. 启动客户端进行测试后端

public static void main(String[] args){
    ServiceConsumer sc = new ServiceConsumer();//建立工具类对象
    while(true){
        //得到rmi远程服务对象
        UserService userService = sc.lookUp();
        try{
            //调用远程方法
            String result = userService.helloRmi("炭烧生蚝");
            System.out.println(result);
            Thread.sleep(100);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}
复制代码

  3. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计. 数组

在这里插入图片描述
在这里插入图片描述
 

IV. 对服务器调用数据进行统计分析

  • 重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 而后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300.
  • 在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息. 此时理论上不会有访问被路由到8888端口的服务器. 当我从新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器.
  • 下面对各服务器的访问量进行统计, 看是否实现了负载均衡.
  • 测试程序以下:
public class DataStatistics {
    private static float ReqToPort7777 = 0;
    private static float ReqToPort8888 = 0;
    private static float ReqToPort9999 = 0;

    public static void main(String[] args) {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("C://test.txt"));
            String line = null;
            while(null != (line = br.readLine())){
                if(line.contains("7777")){
                    ReqToPort7777++;
                }else if(line.contains("8888")){
                    ReqToPort8888++;
                }else if(line.contains("9999")){
                    ReqToPort9999++;
                }else{
                    print(false);
                }
            }
            print(true);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(null != br){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                br = null;
            }
        }
    }

    private static void print(boolean isEnd){
        if(!isEnd){
            System.out.println("------------- 服务器集群发生变化 -------------");
        }else{
            System.out.println("------------- 最后一次统计 -------------");
        }
        System.out.println("截取自上次服务器变化到如今: ");
        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
        ReqToPort7777 = 0;
        ReqToPort8888 = 0;
        ReqToPort9999 = 0;
    }
}

/* 如下是输出结果 ------------- 服务器集群发生变化 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 198.0, 占比0.4419643 8888端口服务器访问量为: 184.0, 占比0.4107143 9999端口服务器访问量为: 66.0, 占比0.14732143 ------------- 服务器集群发生变化 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 510.0, 占比0.7589286 8888端口服务器访问量为: 1.0, 占比0.0014880953 9999端口服务器访问量为: 161.0, 占比0.23958333 ------------- 最后一次统计 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 410.0, 占比0.43248945 8888端口服务器访问量为: 398.0, 占比0.41983122 9999端口服务器访问量为: 140.0, 占比0.14767933 */
复制代码

 

V. 结果

  • 从测试数据能够看出, 不论是8888端口服务器宕机以前, 仍是宕机以后, 三台服务器接收的访问量和性能指数成正比. 成功地验证了一致性哈希算法的负载均衡做用.

 

四. 扩展思考

  • 初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地. 可是一致性哈希算法除了可以让后端服务器实现负载均衡, 还有一个特色多是其余负载均衡算法所不具有的.
  • 这个特色是基于哈希函数的, 咱们知道经过哈希函数, 固定的输入可以产生固定的输出. 换句话说, 一样的请求会路由到相同的服务器. 这点就很牛逼了, 咱们能够结合一致性哈希算法和缓存机制提供后端服务器的性能.
  • 好比说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每一个请求将会带着用户的uid到达, 咱们能够经过哈希函数进行处理(从上面的演示代码能够看到, 这点是能够轻松实现的), 使一样的uid路由到某个独定的服务器. 这样咱们就能够在服务器上对该的uid背后的用户信息进行缓存, 从而减小对数据库或其余中间件的操做, 从而提升系统效率.
  • 固然若是使用该策略的话, 你可能还要考虑缓存更新等操做, 但做为一种优良的策略, 咱们能够考虑在适当的场合灵活运用.
  • 以上思考受启发于Dubbo框架中对其实现的四种负载均衡策略的描述.
相关文章
相关标签/搜索