写做不易,未经做者容许禁止以任何形式转载!
若是以为文章不错,欢迎关注、点赞和分享!
持续分享技术博文,关注微信公众号 👉🏻 前端LeBron
字节跳动校招进行中,校招内推码: 4FCV6BV 游戏部门前端团队可私聊直推javascript
简介html
负载均衡,含义就是根据必定算法将负载(工做任务)进行平衡,分摊到多个操做单元上运行、执行,常见的为Web服务器、企业核心应用服务器和其余主要任务服务器等,从而协同完成工做任务。负载均衡在原有的网络结构上提供了一种透明且有效的的方法扩展服务器和网络设备的带宽、增强网络数据处理能力、增长吞吐量、提升网络的可用性和灵活性,同时承受住更大的并发量级。前端
简单来讲就是将大量的并发请求处理转发给多个后端节点处理,减小工做响应时间。java
四层即OSI七层模型中的传输层,有TCP、UDP协议,这两种协议中包含源IP、目标IP之外,还包含源端口号及目标端口号。四层负载均衡在接收到客户端请求后,经过修改报文的地址信息(IP + PORT)将流量转发到应用服务器。node
代理负载均衡ios
七层即OSI七层模型中的应用层,应用层协议较多,经常使用的为HTTP/HTTPS。七层负载均衡能够给予这些协议来负载。这些应用层协议中会包含不少有意义的内容。好比同一个Web服务器的负载均衡,除了根据IP + PORT进行负载均衡,还能够根据七层的URL、Cookie、浏览器类别、语言、请求类型来决定。git
四层负载均衡的本质是转发,七层负载均衡的本质是内容交换和代理。github
四层负载均衡 | 七层负载均衡 | |
---|---|---|
基于 | IP + PORT | URL 或 主机IP |
相似 | 路由器 | 代理服务器 |
复杂度 | 低 | 高 |
性能 | 高,无需解析内容 | 中,需算法识别URL Header、Cookie等 |
安全性 | 低,没法识别DDoS攻击 | 高,可防护SYN Flood攻击 |
扩展功能 | 无 | 内容缓存、图片防盗链等 |
前置数据结构web
interface urlObj{
url:string,
weight:number // 仅在权重轮询时生效
}
urlDesc: urlObj[]
interface urlCollectObj{
count: number, // 链接数
costTime: number, // 响应时间
connection: number, // 实时链接数
}
urlCollect: urlCollectObj[]
复制代码
随机算法
const Random = (urlDesc) => {
let urlCollect = [];
// 收集url
urlDesc.forEach((val) => {
urlCollect.push(val.url);
});
return () => {
// 生成随机数下标返回相应URL
const pos = parseInt(Math.random() * urlCollect.length);
return urlCollect[pos];
};
};
module.exports = Random;
复制代码
权重轮询算法
const WeiRoundRobin = (urlDesc) => {
let pos = 0,
urlCollect = [],
copyUrlDesc = JSON.parse(JSON.stringify(urlDesc));
// 根据权重收集url
while (copyUrlDesc.length > 0) {
for (let i = 0; i < copyUrlDesc.length; i++) {
urlCollect.push(copyUrlDesc[i].url);
copyUrlDesc[i].weight--;
if (copyUrlDesc[i].weight === 0) {
copyUrlDesc.splice(i, 1);
i--;
}
}
}
// 轮询获取URL函数
return () => {
const res = urlCollect[pos++];
if (pos === urlCollect.length) {
pos = 0;
}
return res;
};
};
module.exports = WeiRoundRobin;
复制代码
源IP / URL Hash
const { Hash } = require("../util");
const IpHash = (urlDesc) => {
let urlCollect = [];
for (const key in urlDesc) {
// 收集url
urlCollect.push(urlDesc[key].url);
}
return (sourceInfo) => {
// 生成Hash十进制数值
const hashInfo = Hash(sourceInfo);
// 取余为下标
const urlPos = Math.abs(hashInfo) % urlCollect.length;
// 返回
return urlCollect[urlPos];
};
};
module.exports = IpHash;
复制代码
一致性Hash
const { Hash } = require("../util");
const ConsistentHash = (urlDesc) => {
let urlHashMap = {},
hashCollect = [];
for (const key in urlDesc) {
// 收集urlHash进数组和生成HashMap
const { url } = urlDesc[key];
const hash = Hash(url);
urlHashMap[hash] = url;
hashCollect.push(hash);
}
// 将hash数组从小到大排序
hashCollect = hashCollect.sort((a, b) => a - b);
return (sourceInfo) => {
// 生成Hash十进制数值
const hashInfo = Hash(sourceInfo);
// 遍历hash数组找到第一个比源信息hash值大的,并经过hashMap返回url
hashCollect.forEach((val) => {
if (val >= hashInfo) {
return urlHashMap[val];
}
});
// 没找大则返回最大的
return urlHashMap[hashCollect[hashCollect.length - 1]];
};
};
module.exports = ConsistentHash;
复制代码
最小链接数
const leastConnections = () => {
return (urlCollect) => {
let min = Number.POSITIVE_INFINITY,
url = "";
// 遍历对象找到最少链接数的地址
for (let key in urlCollect) {
const val = urlCollect[key].connection;
if (val < min) {
min = val;
url = key;
}
}
// 返回
return url;
};
};
module.exports = leastConnections;
复制代码
注:urlCollect为负载均属数据统计对象,有如下属性
最小响应时间
const Fair = () => {
return (urlCollect) => {
let min = Number.POSITIVE_INFINITY,
url = "";
// 找到耗时最少的url
for (const key in urlCollect) {
const urlObj = urlCollect[key];
if (urlObj.costTime < min) {
min = urlObj.costTime;
url = key;
}
}
// 返回
return url;
};
};
module.exports = Fair;
复制代码
看到这里是否是感受算法都挺简单的 🥱
期待一下模块五的实现吧😏
健康监测即对应用服务器的健康监测,为防止把请求转发到异常的应用服务器上,应使用健康监测策略。应对不一样的业务敏感程度,可相应调整策略和频率。
PORT XX unreachable
的ICMP报错信息,反之为正常。Vrtual IP
在TCP / IP架构下,全部想上网的电脑,不论以何种形式连上网络,都不须要有一个惟一的IP地址。事实上IP地址是主机硬件物理地址的一种抽象。
简单来讲地址分为两种
虚拟IP是一个未分配给真实主机的IP,也就是说对外提供的服务器的主机除了有一个真实IP还有一个虚IP,这两个IP中的任意一个均可以链接到这台主机。
虚拟IP通常用做达到高可用的目的,好比让全部项目中的数据库连接配置都是这个虚拟IP,当主服务器发生故障没法对外提供服务时,动态将这个虚IP切换到备用服务器。
好比存在主机A(192.168.1.6)和主机B(192.168.1.8)。A做为对外服务的主服务器,B做为备份机器,两台服务器之间经过HeartBeat通讯。
即主服务器会定时给备份服务器发送数据包,告知主服务器正常,当备份服务器在规定时间内没有收到主服务器的HeartBeat,会认为主服务器宕机。
此时备份服务器就升级为主服务器。
服务器B将本身的ARP缓存发送出去,告知路由器修改路由表,告知虚拟IP地址应该指向192.168.1.8.
这时外接再次访问虚拟IP的时候,机器B就会变成主服务器,而A降级为备份服务器。
这样就完成了主从机器的切换,这一切对外都是无感知、透明的。
想手动实现一下负载均衡器 / 看看源码的同窗均可以看看 👉🏻 代码仓库
编辑config.js后
npm run start
便可启动均衡器和后端服务节点
const {ALGORITHM, BASE_URL} = require("./constant");
module.exports = {
urlDesc: [
{
url: `${BASE_URL}:${16666}`,
weight: 6,
},
{
url: `${BASE_URL}:${16667}`,
weight: 1,
},
{
url: `${BASE_URL}:${16668}`,
weight: 1,
},
{
url: `${BASE_URL}:${16669}`,
weight: 1,
},
{
url: `${BASE_URL}:${16670}`,
weight: 2,
},
{
url: `${BASE_URL}:${16671}`,
weight: 1,
},
{
url: `${BASE_URL}:${16672}`,
weight: 4,
},
],
port: 8080,
algorithm: ALGORITHM.RANDOM,
workerNum: 5,
balancerNum: 5,
workerFilePath:path.resolve(__dirname, "./worker.js")
}
复制代码
初始化负载均衡统计对象balanceDataBase
运行均衡器
运行后端服务节点
const {urlDesc, balancerNum} = require("./config")
const cluster = require("cluster");
const path = require("path");
const cpusLen = require("os").cpus().length;
const {DataBase} = require("./util");
const {Worker} = require('worker_threads');
const runWorker = () => {
// 防止监听端口数 > CPU核数
const urlObjArr = urlDesc.slice(0, cpusLen);
// 初始化建立子线程
for (let i = 0; i < urlObjArr.length; i++) {
createWorkerThread(urlObjArr[i].url);
}
}
const runBalancer = () => {
// 设置子进程执行文件
cluster.setupMaster({exec: path.resolve(__dirname, "./balancer.js")});
// 初始化建立子进程
let max
if (balancerNum) {
max = balancerNum > cpusLen ? cpusLen : balancerNum
} else {
max = 1
}
for (let i = 0; i < max; i++) {
createBalancer();
}
}
// 初始化负载均衡数据统计对象
const balanceDataBase = new DataBase(urlDesc);
// 运行均衡器
runBalancer();
// 运行后端服务节点
runWorker();
复制代码
建立进程
监听进程通讯消息
监听更新响应时间事件并执行更新函数
监听获取统计对象事件并返回
监听异常退出并从新建立,进程守护。
const createBalancer = () => {
// 建立进程
const worker = cluster.fork();
worker.on("message", (msg) => {
// 监听更新响应时间事件
if (msg.type === "updateCostTime") {
balanceDataBase.updateCostTime(msg.URL, msg.costTime)
}
// 监听获取url统计对象事件并返回
if (msg.type === "getUrlCollect") {
worker.send({type: "getUrlCollect", urlCollect: balanceDataBase.urlCollect})
}
});
// 监听异常退出事件并从新建立进程
worker.on("exit", () => {
createBalancer();
});
}
复制代码
建立线程
解析须要监听的端口
向子线程通讯,发送须要监听的端口
经过线程通讯,监听子线程事件
监听异常退出并从新建立,线程守护。
const createWorkerThread = (listenUrl) => {
// 建立线程
const worker = new Worker(path.resolve(__dirname, "./workerThread.js"));
// 获取监听端口
const listenPort = listenUrl.split(":")[2];
// 向子线程发送要监听的端口号
worker.postMessage({type: "port", port: listenPort});
// 接收子线程消息统计进程被访问次数
worker.on("message", (msg) => {
// 监听链接事件并触发计数事件
if (msg.type === "connect") {
balanceDataBase.add(msg.port);
}
// 监听断开链接事件并触发计数事件
else if (msg.type === "disconnect") {
balanceDataBase.sub(msg.port);
}
});
// 监听异常退出事件并从新建立进程
worker.on("exit", () => {
createWorkerThread(listenUrl);
});
}
复制代码
获取getURL工具函数
监听请求并代理
注1:LoadBalance函数即经过算法名称返回不一样的getURL工具函数,各算法实现见模块二:常见算法
注2:getSource函数即处理参数并返回,getURL为上面讲到的获取URL工具函数。
const cpusLen = require("os").cpus().length;
const LoadBalance = require("./algorithm");
const express = require("express");
const axios = require("axios");
const app = express();
const {urlFormat, ipFormat} = require("./util");
const {ALGORITHM, BASE_URL} = require("./constant");
const {urlDesc, algorithm, port} = require("./config");
const run = () => {
// 获取转发URL工具函数
const getURL = LoadBalance(urlDesc.slice(0, cpusLen), algorithm);
// 监听请求并均衡代理
app.get("/", async (req, res) => {
// 获取须要传入的参数
const source = await getSource(req);
// 获取URL
const URL = getURL(source);
// res.redirect(302, URL) 重定向负载均衡
// 记录请求开始时间
const start = Date.now();
// 代理请求
axios.get(URL).then(async (response) => {
// 获取负载均衡统计对象并返回
const urlCollect = await getUrlCollect();
// 处理跨域
res.setHeader("Access-Control-Allow-Origin", "*");
response.data.urlCollect = urlCollect;
// 返回数据
res.send(response.data);
// 记录相应时间并更新
const costTime = Date.now() - start;
process.send({type: "updateCostTime", costTime, URL})
});
});
// 负载均衡服务器开始监听请求
app.listen(port, () => {
console.log(`Load Balance Server Running at ${BASE_URL}:${port}`);
});
};
run();
const getSource = async (req) => {
switch (algorithm) {
case ALGORITHM.IP_HASH:
return ipFormat(req);
case ALGORITHM.URL_HASH:
return urlFormat(req);
case ALGORITHM.CONSISTENT_HASH:
return urlFormat(req);
case ALGORITHM.LEAST_CONNECTIONS:
return await getUrlCollect();
case ALGORITHM.FAIR:
return await getUrlCollect();
default:
return null;
}
};
复制代码
// 获取负载均衡统计对象
const getUrlCollect = () => {
return new Promise((resolve, reject) => {
try {
process.send({type: "getUrlCollect"})
process.on("message", msg => {
if (msg.type === "getUrlCollect") {
resolve(msg.urlCollect)
}
})
} catch (e) {
reject(e)
}
})
}
复制代码
使用多线程+多进程模型,为每一个服务节点提供并发能力。
根据配置文件,建立相应数量服务节点。
const cluster = require("cluster");
const cpusLen = require("os").cpus().length;
const {parentPort} = require('worker_threads');
const {workerNum, workerFilePath} = require("./config")
if (cluster.isMaster) {
// 建立工做进程函数
const createWorker = () => {
// 建立进程
const worker = cluster.fork();
// 监听父线程消息,并转发给子进程。
parentPort.on("message", msg => {
if (msg.type === "port") {
worker.send({type: "port", port: msg.port})
}
})
// 监听子进程消息并转发给父线程
worker.on("message", msg => {
parentPort.postMessage(msg);
})
// 监听进程异常退出并从新建立
worker.on("exit", () => {
createWorker();
})
}
// 按配置建立进程,但不可大于CPU核数
let max
if (workerNum) {
max = workerNum > cpusLen ? cpusLen : workerNum
} else {
max = 1
}
for (let i = 0; i < max; i++) {
createWorker();
}
} else {
// 后端服务执行文件
require(workerFilePath)
}
复制代码
var express = require("express");
var app = express();
let port = null;
app.get("/", (req, res) => {
// 触发链接事件
process.send({type: "connect", port});
// 打印信息
console.log("HTTP Version: " + req.httpVersion);
console.log("Connection PORT Is " + port);
const msg = "Hello My PORT is " + port;
// 返回响应
res.send({msg});
// 触发断开链接事件
process.send({type: "disconnect", port});
});
// 接收主进通讯消息中的端口口并监听
process.on("message", (msg) => {
if (msg.type === "port") {
port = msg.port;
app.listen(port, () => {
console.log("Worker Listening " + port);
});
}
});
复制代码
status:任务队列状态
urlCollect:数据统计对象(提供给各算法使用 / 展现数据)
add方法
sub方法
updateCostTime方法
class DataBase {
urlCollect = {};
// 初始化
constructor (urlObj) {
urlObj.forEach((val) => {
this.urlCollect[val.url] = {
count: 0,
costTime: 0,
connection: 0,
};
});
}
//增长链接数和实时链接数
add (port) {
const url = `${BASE_URL}:${port}`;
this.urlCollect[url].count++;
this.urlCollect[url].connection++;
}
// 减小实时链接数
sub (port) {
const url = `${BASE_URL}:${port}`;
this.urlCollect[url].connection--;
}
// 更新响应时间
updateCostTime (url, time) {
this.urlCollect[url].costTime = time;
}
}
复制代码
作了个可视化图表来看均衡效果(Random)✔️
看起来均衡效果还不错🧐
想手动实现一下负载均衡器 / 看看源码的同窗均可以看看 👉🏻 代码仓库
经过cluster.isMaster判断是否为主进程,主进程不负责任务处理,只负责管理和调度工做子进程。
master主进程启动了一个TCP服务器,真正监听端口的只有这个TCP服务器。请求触发了这个TCP服务器的connection
事件后,经过句柄转发(IPC)给工做进程处理。
如何选择工做进程?
为何不直接用cluster进行负载均衡?
常见的进程间通讯方式
管道通讯
信号量
共享内存
Socket
消息队列
Node中实现IPC通道是依赖于libuv。Windows下由命名管道实现,*nix系统则采用Domain Socket实现。
表如今应用层上的进程间通讯只有简单的message事件和send()方法,接口十分简洁和消息化。
IPC管道是如何创建的?
欢迎留言讨论
Node.js非阻塞异步I/O速度快,前端扩展服务端业务?
企业实践,说明Node仍是可靠的?
Node计算密集型不友好?
Node生态不如其余成熟的语言
讨论