从零开始写一个SSR分布式缓存服务(一)

前言

公司如今全部新项目已经所有使用next.js了,最近在作性能优化,什么HTTP2GzipCache-ControlService WorkerCloudFlare,一顿操做猛如虎前端

一看战绩十比五,速度提高很多,奈斯!nginx

这就好了吗,NONONO,做为一个前端老鸟,固然要有更高的追求git

再想一想还有没有能够优化的地方,老衲掐指一算,若是每次访问都执行renderToHTML渲染页面,不但增长TTFB时间还浪费CPU,确定要优化,官方服务端缓存的例子:ssr-caching,是基于cacheable-response的,可是咱们如今都是一个站PM2开多个进程,nginx作负载均衡,线上热更新,一个进程缓存一遍,那岂不是开几个进程一样的内容缓存几回,就像这样:github

相同的内容重复缓存,浪费大量的服务器内存,而咱们想要的是这样的:web

那搭一个memcached或redis
搭redis好烦哦,我这么懒,只能本身动手写一个了
解决这个问题的思路很简单,经过客户端和服务端通讯实现缓存统一管理
废话很少说,盘他!redis

缓存

咱们先写一个基本的缓存数据库

function Cache(options) {
 "use strict";
    
    const defaults = {...};
    const conf = Object.assign({}, defaults, options);
    const timeoutIds = {};
    
    // 存储接口,好比你要存到文件,数据库等其余地方,只须要实现这三个方法
    const localStorage = conf.localStorage || {
        cache: {},
        getItem(key){
            return this.cache[key];
        },
        setItem(key, value){
            this.cache[key] = value;
        },
        removeItem(key){
            delete this.cache[key];
        }
    };
    
    // 存储接口再包装一层,处理数据类型之类的操做
    const storage = {
        get: (key, dataType="json")=>{
            let data = localStorage.getItem(key);
            return dataType === "json" ? JSON.parse(data || "null") : data;
        },
        set: (key, value)=>{
            if (typeof value !== "string") {
                value = JSON.stringify(value);
            }
            localStorage.setItem(key, value);
        },
        remove: (key)=>{
            localStorage.removeItem(key);
        }
    };
    
    // 如下是对外公开的方法
    this.set = (key, value, ttl=3600)=>{
        // 缓存是有存活时间的,因此这里须要把原始数据包装一下
        storage.set(key, {
            value: value,  // 缓存原始数据
            timestamp: new Date().getTime(),  // 缓存写入时间,用来计算是否过时
            ttl: ttl,  // 存活时间,单位秒,默认一小时
        });
        /* 缓存过时清理 清除上一个setTimeout,不然同一个key屡次set,第一个setTimeout会提早删除缓存 应以最后一次为准 */
        clearTimeout(timeoutIds[key]);
        timeoutIds[key] = setTimeout(()=>{
            this.remove(key);
        }, ttl * 1000);
    };
    
    this.get = (key, dataType="json")=>{
        let result = storage.get(key, dataType) || {};
        // 判断缓存是否过时
        if (new Date() - result.timestamp > result.ttl * 1000){
            storage.remove(key);
            return null;
        }
        return result.value;
    };
    
    this.remove = (key)=>{
        storage.remove(key);
    };
}
复制代码

用法就像这样:npm

const cache = new Cache();

let value = cache.get("your_cache_key");
if (!value){
    value = getValueFunction();
    cache.set("your_cache_key", value, 7200);
}
复制代码

服务端

接下来要写一个服务端,该咱们的WebSocket登场了(为何用WebSocket?由于它全双工,而且有现成的轮子,并且前端也比较熟悉),WebSocket的框架有不少,咱们来选一个,小公鸡点到谁我就选。。。就它了:ws(大名鼎鼎的socket.io内部的WebSocket链接就是用它实现的),首先安装wsjson

npm install ws --save
复制代码

安装成功后就能够基于ws写咱们的服务端了,缓存

const WebSocket = require("ws");

function Server(options) {
 "use strict";
    
    const defaults = {
        port: 666,                 // 端口
        verifyClient: () => true,  // 返回true表示赞成链接,返回false表示拒绝链接
    };
    const conf = Object.assign({}, defaults, options);
    const cache = new Cache(conf);
    
    // 发送消息的方法,封装这个方法的目的是防止异常致使程序崩溃
    function sendTo(client, data) {
        try {
            if (typeof data !== "string"){
                data = JSON.stringify(data);
            }
            client.send(data);
        } catch (e) {
            console.error(e);
        }
    }
    
    // 启动服务
    this.start = function () {
        return new Promise(resolve => {
            const WebSocketServer = new WebSocket.Server({
                port: conf.port,
                verifyClient: conf.verifyClient,
            });
            WebSocketServer.on("connection", function (client, request) {
                // 服务端收到消息
                client.on("message", function (message) {
                    try {
                        /* 收到消息后,执行对应的操做,并返回消息给客户端 消息结构:{ id: "...", // 消息id,用来回复消息对应的请求,后面客户端会讲到 data: { action: "get", // 要执行的操做 key: "your_cache_key", value: "your_value", ttl: 3600, } } */
                        let {id, data} = JSON.parse(message);
                        if (data.action === "get"){
                            let result = cache.get(data.key);
                            sendTo(client, {
                                id: id,
                                data: result,
                            });
                        } else if (data.action === "set"){
                            cache.set(data.key, data.value, data.ttl);
                            sendTo(client, {
                                id: id,
                                success: true,
                                message: `set cache ${data.key} success!`
                            });
                        } else if (data.action === "remove"){
                            cache.remove(data.key);
                            sendTo(client, {
                                id: id,
                                success: true,
                                message: `remove cache ${data.key} success!`
                            });
                        }
                    } catch (e) {
                        console.error("incoming message: ", message, "error: ", e);
                    }
                });
                // 用来判断链接是否断开
                client.isAlive = true;
                // 响应心跳包
                client.on("pong", function () {
                    client.isAlive = true;
                });
            });
            
            // 心跳检测
            setInterval(function () {
                WebSocketServer.clients.forEach(function (client) {
                    /* 若是这个链接的isAlive是false,说明没有响应心跳,断开链接 虽然WebSocket是长链接,但操做系统会按期检测,把不活动的链接断开 因此须要心跳机制来维持链接处于活动状态不被断开 */
                    if (client.isAlive === false) {
                        return client.terminate();
                    }
                    client.isAlive = false;
                    // 发送心跳包
                    client.ping(function () {});
                });
            }, 60 * 1000);
            
            // 监听事件
            WebSocketServer.on("listening", function () {
                console.log(`NODE_ENV: ${process.env.NODE_ENV}`);
                console.log(`WebSocket server is listening at ${conf.port}`);
                resolve();
            });
        });
    };
}
复制代码

客户端

OK,服务端完成,接下来写客户端,客户端分两块,一个是负责收发消息的Socket,另外一个是负责解析消息返回操做结果的Client,咱们先来写Socket

这里有个要注意的地方,WebSocket和HTTP不同,好比客户端发三条请求消息到服务端,服务端回复三条消息给客户端,可是客户端接收到这三条消息的顺序是否和请求顺序同样,那可就不必定了

通常的作法就是给消息加一个id,服务器返回消息带上对应id,根据id执行对应的callback,就像你去KFC吃饭,点餐下单后会给你一个取餐号,厨师把菜作好之后会叫号取餐

这里咱们须要生成惟一id,js生成惟一id的方法通常有三种:

  1. 自增数
  2. 时间戳
  3. UUID

自增数:若是网站访问量较大,天天百万PV,天天的增量就几百万,用不了几天就炸了
时间戳:假如1秒1000以上的请求,因为时间戳的最小单位是毫秒,确定会产生多个相同的时间戳
UUID : 重复的几率比你被陨石砸中的几率还要低

固然是用重复率最低的UUID了

function Socket(url, options) {
 "use strict";
    
    const defaults = {
        retry: 5,
        timeout: 30000,
        ws: {                         // ws的配置
            perMessageDeflate: true,  // 启用数据压缩
        },
    };
    const conf = Object.assign({}, defaults, options);
    
    // WebSocket Client
    let ws = {};
    // 是否链接成功
    let isConnectSuccess = false;
    // 链接状态
    let isConnected = false;
    // 对外公开的链接状态,链接状态不能被外部修改,因此要只读而且不可重定义
    Object.defineProperties(this, {
        isConnected: {
            configurable: false,
            get: ()=>{
                return isConnected;
            }
        }
    });
    
    // 等待回调的任务队列
    const queue = {};
    // 消息的惟一id,用来关联请求响应
    const getUniqueId = function () {
        return uuid();
    };
    // 响应请求的方法
    const response = function (id, data) {
        const callback = queue[id];
        if (callback){
            delete queue[id];
            if (typeof callback === "function"){
                callback(data);
            }
        }
    };

    this.connect = ()=>{
        return new Promise((resolve) => {
            try {
                if (ws.terminate){
                    ws.terminate();
                }
                // 链接服务器
                ws = new WebSocket(url, conf.ws);
                
                ws.on("open", ()=>{
                    isConnectSuccess = true;
                    isConnected = true;
                    console.log(`server is connected!`);
                    resolve(true);
                });
                
                ws.on("message", (data)=>{
                    try {
                        let result = JSON.parse(data);
                        response(result.id, result.data);
                    } catch (e) {
                        console.error(e);
                    }
                });
                
                ws.on("error", (e)=>{
                    console.error(e);
                    resolve(false);
                });
                
                ws.on("close", ()=>{
                    isConnected = false;
                    console.log(`connection is closed!`);
                    // 链接成功后意外状况致使的链接中断才须要自动重连
                    if (isConnectSuccess === true){
                        console.log(`reconnecting...`);
                        this.connect(url);
                    }
                });
            } catch (e) {
                console.error(e);
                resolve(false);
            }
        })
    };
    
    this.send = (data, callback)=>{
        const id = getUniqueId();
        queue[id] = callback;
        try {
            if (ws.readyState === WebSocket.OPEN){
                ws.send(JSON.stringify({
                    id: id,
                    data: data,
                }));
                // 若是意外状况致使服务端没有返回消息,进行超时处理
                setTimeout(()=>{
                    response(id, null);
                }, conf.timeout);
            } else {
                console.error(`server is disconnected!`);
                response(id, null);
            }
        } catch (e) {
            console.error(e);
            response(id, null);
        }
    };
}
复制代码

接下来实现客户端的基本操做就能够了

function Client(url, options) {
 "use strict";
    
    const defaults = {
        ttl: 3600,
        socket: {
            ws: {},
        },
    };
    const conf = Object.assign({}, defaults, options);
    const socket = new Socket(url, conf.socket);

    Object.defineProperties(this, {
        isConnected: {
            configurable: false,
            get: ()=>{
                return socket.isConnected;
            }
        }
    });
    
    this.connect = async () => {
        try {
            if (socket.readyState !== WebSocket.OPEN){
                await socket.connect(url);
            } else {
                console.log(`server is already connected!`);
            }
            return true;
        } catch (e) {
            return e;
        }
    };
    
    this.get = (key) => {
        return new Promise(resolve => {
            socket.send({
                action: "get",
                key: key,
            }, (result)=>{
                resolve(result.value);
            });
        });
    };
    
    this.set = (key, value, ttl) => {
        return new Promise(resolve => {
            socket.send({
                action: "set",
                key: key,
                value: value,
                ttl: ttl || conf.ttl,
            }, resolve);
        });
    };
}
复制代码

用法示例

到此,整个缓存服务基本功能就算完成了,具体怎么使用,下面举个栗子:

// server
const NextCache = require("next-cache");
const server = new NextCache.Server({
    port: 666,
});
server.start();

// client
const NextCache = require("next-cache");
const cache = new NextCache.Client("ws://localhost:666");
await cache.connect();

let value = await cache.get(`your_cache_key`);
if (!value){
    value = getValueFunction();
    await cache.set("your_cache_key", value, 7200);
}
复制代码

能够用了,不错不错

你觉得这就完了吗?

too young too naive

链接风暴

基本功能虽然完成了,但在实际应用中,这样不能把缓存的做用发挥到最大,问题的关键在这段代码:

let value = await cache.get(`your_cache_key`);
if (!value){
    value = getValueFunction();
    await cache.set("your_cache_key", value, 3600);
}
复制代码

且听老衲慢慢道来

众所周知缓存应用的场景大部分是在高访问量高并发场景下,那么上面那段代码在高并发场景会出现什么问题,假设如今有多个请求并发执行,咱们来分析一下执行过程:

// Step 1: 如今有10个线程几乎同时进来取缓存,假设如今缓存已过时
let value = await cache.get(`your_cache_key`);

// Step 2: 10个线程拿到的值都是空的
if (!value){

    // Step 3: 10个线程都执行了取值方法
    value = getValueFunction();
    
    // Step 4: 10个线程用取到的值填充缓存
    await cache.set("your_cache_key", value, 3600);
}
复制代码

缓存没有更新完以前,进来的线程都会去更新缓存,这样会大大下降缓存的命中率,致使服务器资源飙升,为了验证上面的推测,咱们来模拟高并发场景作个测试:

const server = new Server({
    port: 666
});
await server.start();

const cache = new Client(`ws://localhost:666`);
await cache.connect();

function sleep(ms) {
    return new Promise(resolve => {
        setTimeout(resolve, ms)
    })
}

// 模拟取数据的操做
async function getValue() {
    console.log(`get value`);
    await sleep(1000);
    return `your value`;
}

async function test() {
    // 填充缓存
    await cache.set("cache_key", "your value", 3);
    // 等待缓存过时
    await sleep(5 * 1000);
    // 模拟10次并发
    for (let i=0; i<10; i++){
        (async function () {
            let value = await cache.get("cache_key");
            if (!value){
                value = await getValue();
                cache.set("cache_key", value, 3600);
            }
        })();
    }
}

test();
复制代码

上面代码的运行结果:

getValue执行了10次,这就是所谓的链接风暴

强人锁男

如何解决链接风暴的问题?

答案:脏数据 +

思路就是,给线程加锁,更新缓存时,只放一个线程去更新缓存,其余线程返回旧的脏数据,缓存更新完以后进来的线程,就能拿到新数据啦

既然须要脏数据,那就不能在缓存过时后当即删除缓存,但也不能一直放着占内存,因此须要设置一个延迟删除的时间

function Server(options){
 "use strict";
    
    const defaults = {
        port: process.env.PORT || 666,
        verifyClient: () => true,
        removeDelay: 60,  // 缓存延迟删除的时间,单位秒
    };
    const conf = Object.assign({}, defaults, options);
    const cache = new Cache(conf);
    ...
}

function Cache(options){
    const defaults = {
        localStorage: null,
        removeDelay: 60,
    };
    const conf = Object.assign({}, defaults, options);
    const timeoutIds = {};
    ...
    this.get = (key, dataType)=>{
        let result = storage.get(key, dataType || "json") || {};
        if (new Date() - result.timestamp > result.ttl * TimeUnit.Second + conf.removeDelay * TimeUnit.Second){
            storage.remove(key);
            return null;
        }
        return result;
    };
    this.set = (key, value, ttl)=>{
        storage.set(key, {
            value: value,
            timestamp: new Date().getTime(),
            ttl: ttl,
        });
        clearTimeout(timeoutIds[key]);
        timeoutIds[key] = setTimeout(()=>{
            storage.remove(key);
        }, ttl * TimeUnit.Second + conf.removeDelay * TimeUnit.Second);
    };
}
复制代码

而后就是加锁了

const lock = {};
...
this.get = (key, getValue) => {
    return new Promise((resolve, reject) => {
        socket.send({
            action: "get",
            key: key,
        }, async(result)=>{
            try {
                if (!result.value || (isCacheExpired(result) && typeof getValue === "function" && !lock[key])){
                    // 第一个线程进来加锁,不让后面的线程进来
                    lock[key] = true;
                    result = await getValue();
                    if (typeof result !== "object" || "ttl" in result === false){
                        result = {value: result, ttl: conf.ttl};
                    }
                    await this.set(key, result.value, result.ttl);
                    // 释放锁
                    delete lock[key];
                }
            } catch (e) {
                // 若是更新缓存出错,也要释放锁,不然会死锁
                delete lock[key];
                reject(e);
            } finally {
                // 其余线程没进if,直接返回脏数据
                resolve(result.value);
            }
        });
    });
};
复制代码

修改以后,客户端get用法稍微有点变化:

let value = await cache.get(`cache_key`, ()=>{
    return 123;  // 默认缓存1小时
});
复制代码
let value = await cache.get(`cache_key`, async()=>{
    return {
        value: 123,
        ttl: 7200  // 缓存2小时
    }
});
复制代码

怎么样,用法是否是比之前更简单更清晰了,咱们来作一下测试,把以前的test方法改一下:

...
async function test(){
    await cache.set(`cache_key`, `your value`, 3);
    await sleep(5 * 1000);
    for (let i=0; i<10; i++){
        cache.get(`cache_key`, getValue);
    }
}

test();
复制代码

运行结果:

Congratulations!

一个简单的SSR缓存服务就大功告成了

完整代码:next-cache

不过,一个成熟的做品,还要具有安全性,分布式,高可用性,容灾备份等等

下期给你们讲安全性和分布式的实现

相关文章
相关标签/搜索