- 哈希算法的选择
- 请求节点在哈希环上寻找对应服务器的策略
- 哈希环的组织结构
TreeMap
.
- 虚拟节点与真实节点的映射关系
如何肯定一个虚拟节点对应的真实节点也是个问题. 理论上应该维护一张表记录真实节点与虚拟节点的映射关系. 本引入案例为了演示采用简单的字符串处理. 比方说服务器192.168.0.1:8888
分配了1000个虚拟节点, 那么它的虚拟节点名称从192.168.0.1:8888@1
一直到192.168.0.1:8888@1000
. 经过这样的处理, 咱们在经过虚拟节点找真实节点时只须要裁剪字符串便可.html
计划定制好后, 下面开始怼代码前端
public class ConsistentHashTest {
/** * 服务器列表,一共有3台服务器提供服务, 将根据性能分配虚拟节点 */
public static String[] servers = {
"192.168.0.1#100", //服务器1: 性能指数100, 将得到1000个虚拟节点
"192.168.0.2#100", //服务器2: 性能指数100, 将得到1000个虚拟节点
"192.168.0.3#30" //服务器3: 性能指数30, 将得到300个虚拟节点
};
/** * 真实服务器列表, 因为增长与删除的频率比遍历高, 用链表存储比较划算 */
private static List<String> realNodes = new LinkedList<>();
/** * 虚拟节点列表 */
private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();
static{
for(String s : servers){
//把服务器加入真实服务器列表中
realNodes.add(s);
String[] strs = s.split("#");
//服务器名称, 省略端口号
String name = strs[0];
//根据服务器性能给每台真实服务器分配虚拟节点, 并把虚拟节点放到虚拟节点列表中.
int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
for(int i = 1; i <= virtualNodeNum; i++){
virtualNodes.put(FVNHash(name + "@" + i), name + "@" + i);
}
}
}
public static void main(String[] args) {
new Thread(new RequestProcess()).start();
}
static class RequestProcess implements Runnable{
@Override
public void run() {
String client = null;
while(true){
//模拟产生一个请求
client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
//计算请求的哈希值
int hash = FVNHash(client);
//判断请求将由哪台服务器处理
System.out.println(client + " 的请求将由 " + getServer(client) + " 处理");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static String getServer(String client) {
//计算客户端请求的哈希值
int hash = FVNHash(client);
//获得大于该哈希值的全部map集合
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
//找到比该值大的第一个虚拟节点, 若是没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//经过该虚拟节点得到真实节点的名称
String virtualNodeName = virtualNodes.get(targetKey);
String realNodeName = virtualNodeName.split("@")[0];
return realNodeName;
}
public static int getN(){
return (int)(Math.random() * 128);
}
public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}
/* 运行结果片断 55.1.13.47:6240 的请求将由 192.168.0.1 处理 5.49.56.126:1105 的请求将由 192.168.0.1 处理 90.41.8.88:6884 的请求将由 192.168.0.2 处理 26.107.104.81:2989 的请求将由 192.168.0.2 处理 114.66.6.56:8233 的请求将由 192.168.0.1 处理 123.74.52.94:5523 的请求将由 192.168.0.1 处理 104.59.60.2:7502 的请求将由 192.168.0.2 处理 4.94.30.79:1299 的请求将由 192.168.0.1 处理 10.44.37.73:9332 的请求将由 192.168.0.2 处理 115.93.93.82:6333 的请求将由 192.168.0.2 处理 15.24.97.66:9177 的请求将由 192.168.0.2 处理 100.39.98.10:1023 的请求将由 192.168.0.2 处理 61.118.87.26:5108 的请求将由 192.168.0.2 处理 17.79.104.35:3901 的请求将由 192.168.0.1 处理 95.36.5.25:8020 的请求将由 192.168.0.2 处理 126.74.56.71:7792 的请求将由 192.168.0.2 处理 14.63.56.45:8275 的请求将由 192.168.0.1 处理 58.53.44.71:2089 的请求将由 192.168.0.3 处理 80.64.57.43:6144 的请求将由 192.168.0.2 处理 46.65.4.18:7649 的请求将由 192.168.0.2 处理 57.35.27.62:9607 的请求将由 192.168.0.2 处理 81.114.72.3:3444 的请求将由 192.168.0.1 处理 38.18.61.26:6295 的请求将由 192.168.0.2 处理 71.75.18.82:9686 的请求将由 192.168.0.2 处理 26.11.98.111:3781 的请求将由 192.168.0.1 处理 62.86.23.37:8570 的请求将由 192.168.0.3 处理 */
复制代码
zookeeper
, zookeeper
的数据一致性算法保证数据实时, 准确, 客户端可以经过zookeeper
得知实时的服务器状况.zookeeper
, 并在zookeeper
上注册本身的接口服务(注册节点). 客户端链接上zookeeper
后, 把已注册的节点(服务器)添加到本身的服务器列表中.zookeeper
中注销. 客户端监听到服务器节点有变时, 也会动态调整本身的服务器列表, 把当宕机的服务器从服务器列表中删除, 所以不会再向该服务器发送请求, 负载均衡的任务将交到剩余的机器身上.zookeeper
集群环境:zookeeper
服务, 构成集群. 在各自的data
文件夹中添加一个myid
文件, 各个id分别为1, 2, 3
.
zookeeper
的端口号. 本案例中三台zookeeper
分别在2181, 2182, 2183
端口
zookeeper
集群因为zookeeper不是本案例的重点, 细节暂不展开讲了.java
zookeeper
依赖zookeeper
的信息public interface Constant {
//zookeeper集群的地址
String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
//链接zookeeper的超时时间
int ZK_TIME_OUT = 5000;
//服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
String ZK_REGISTRY = "/provider";
//zookeeper集群中注册服务的url地址的瞬时节点
String ZK_RMI = ZK_REGISTRY + "/rmi";
}
复制代码
3.封装操做zookeeper
和发布远程服务的接口供本身调用, 本案例中发布远程服务使用Java自身提供的rmi
包完成, 若是没有了解过能够参考这篇node
public class ServiceProvider {
private CountDownLatch latch = new CountDownLatch(1);
/** * 链接zookeeper集群 */
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//若是链接上了就唤醒当前线程.
latch.countDown();
}
});
latch.await();//还没链接上时当前线程等待
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
/** * 建立znode节点 * @param zk * @param url 节点中写入的数据 */
public void createNode(ZooKeeper zk, String url){
try{
//要把写入的数据转化为字节数组
byte[] data = url.getBytes();
zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}
/** * 发布rmi服务 */
public String publishService(Remote remote, String host, int port){
String url = null;
try{
LocateRegistry.createRegistry(port);
url = "rmi://" + host + ":" + port + "/rmiService";
Naming.bind(url, remote);
} catch (Exception e) {
e.printStackTrace();
}
return url;
}
/** * 发布rmi服务, 而且将服务的url注册到zookeeper集群中 */
public void publish(Remote remote, String host, int port){
//调用publishService, 获得服务的url地址
String url = publishService(remote, host, port);
if(null != url){
ZooKeeper zk = connectToZK();//链接到zookeeper
if(null != zk){
createNode(zk, url);
}
}
}
}
复制代码
4. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上Hello
, 并返回字符串.算法
//UserService
public interface UserService extends Remote {
public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {
public UserServiceImpl() throws RemoteException{
super();
}
@Override
public String helloRmi(String name) throws RemoteException {
return "Hello " + name + "!";
}
}
复制代码
5. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连.数据库
public static void main(String[] args) throws RemoteException {
//建立工具类对象
ServiceProvider sp = new ServiceProvider();
//建立远程服务对象
UserService userService = new UserServiceImpl();
//完成发布
sp.publish(userService, "localhost", 9999);
}
复制代码
public class ServiceConsumer {
/** * 提供远程服务的服务器列表, 只记录远程服务的url */
private volatile List<String> urls = new LinkedList<>();
/** * 远程服务对应的虚拟节点集合 */
private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();
public ServiceConsumer(){
ZooKeeper zk = connectToZK();//客户端链接到zookeeper
if(null != zk){
//链接上后关注zookeeper中的节点变化(服务器变化)
watchNode(zk);
}
}
private void watchNode(final ZooKeeper zk) {
try{
//观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//若是服务器节点有变化就从新获取
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
watchNode(zk);
}
}
});
//将获取到的服务器节点数据保存到集合中, 也就是得到了远程服务的访问url地址
List<String> dataList = new LinkedList<>();
TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
for(String nodeStr : nodeList){
byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
//放入服务器列表的url
String url = new String(data);
//为每一个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其余分配1000个.
if(url.contains("9999")){
for(int i = 1; i <= 300; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}else{
for(int i = 1; i <= 1000; i++){
newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
}
}
dataList.add(url);
}
urls = dataList;
virtualNodes = newVirtualNodesList;
dataList = null;//好让垃圾回收器尽快收集
newVirtualNodesList = null;
} catch (Exception e) {
e.printStackTrace();
}
}
/** * 根据url得到远程服务对象 */
public <T> T lookUpService(String url){
T remote = null;
try{
remote = (T)Naming.lookup(url);
} catch (Exception e) {
//若是该url链接不上, 颇有多是该服务器挂了, 这时使用服务器列表中的第一个服务器url从新获取远程对象.
if(e instanceof ConnectException){
if (urls.size() != 0){
url = urls.get(0);
return lookUpService(url);
}
}
}
return remote;
}
/** * 经过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象 */
public <T extends Remote> T lookUp(){
T service = null;
//随机计算一个哈希值
int hash = FVNHash(Math.random() * 10000 + "");
//获得大于该哈希值的全部map集合
SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
//找到比该值大的第一个虚拟节点, 若是没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//经过该虚拟节点得到服务器url
String virtualNodeName = virtualNodes.get(targetKey);
String url = virtualNodeName.split("@")[0];
//根据服务器url获取远程服务对象
service = lookUpService(url);
System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
return service;
}
private CountDownLatch latch = new CountDownLatch(1);
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判断是否链接zk集群
latch.countDown();//唤醒处于等待状态的当前线程
}
});
latch.await();//没有链接上的时候当前线程处于等待状态.
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zk;
}
public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}
复制代码
2. 启动客户端进行测试后端
public static void main(String[] args){
ServiceConsumer sc = new ServiceConsumer();//建立工具类对象
while(true){
//得到rmi远程服务对象
UserService userService = sc.lookUp();
try{
//调用远程方法
String result = userService.helloRmi("炭烧生蚝");
System.out.println(result);
Thread.sleep(100);
}catch(Exception e){
e.printStackTrace();
}
}
}
复制代码
3. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计. 数组
public class DataStatistics {
private static float ReqToPort7777 = 0;
private static float ReqToPort8888 = 0;
private static float ReqToPort9999 = 0;
public static void main(String[] args) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("C://test.txt"));
String line = null;
while(null != (line = br.readLine())){
if(line.contains("7777")){
ReqToPort7777++;
}else if(line.contains("8888")){
ReqToPort8888++;
}else if(line.contains("9999")){
ReqToPort9999++;
}else{
print(false);
}
}
print(true);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(null != br){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
br = null;
}
}
}
private static void print(boolean isEnd){
if(!isEnd){
System.out.println("------------- 服务器集群发生变化 -------------");
}else{
System.out.println("------------- 最后一次统计 -------------");
}
System.out.println("截取自上次服务器变化到如今: ");
float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
ReqToPort7777 = 0;
ReqToPort8888 = 0;
ReqToPort9999 = 0;
}
}
/* 如下是输出结果 ------------- 服务器集群发生变化 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 198.0, 占比0.4419643 8888端口服务器访问量为: 184.0, 占比0.4107143 9999端口服务器访问量为: 66.0, 占比0.14732143 ------------- 服务器集群发生变化 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 510.0, 占比0.7589286 8888端口服务器访问量为: 1.0, 占比0.0014880953 9999端口服务器访问量为: 161.0, 占比0.23958333 ------------- 最后一次统计 ------------- 截取自上次服务器变化到如今: 7777端口服务器访问量为: 410.0, 占比0.43248945 8888端口服务器访问量为: 398.0, 占比0.41983122 9999端口服务器访问量为: 140.0, 占比0.14767933 */
复制代码
uid
到达, 咱们能够经过哈希函数进行处理(从上面的演示代码能够看到, 这点是能够轻松实现的), 使一样的uid
路由到某个独定的服务器. 这样咱们就能够在服务器上对该的uid
背后的用户信息进行缓存, 从而减小对数据库或其余中间件的操做, 从而提升系统效率.Dubbo
框架中对其实现的四种负载均衡策略的描述.