活用控制反转 -- 解决一个棘手信息传递问题

在我初学编程的时候,还没写过完整点的项目就看过了一些高阶概念。在没有实践时,这些概念的神奇和强大之处很难被彻底体会的。而一旦本身在摸索中应用了,瞬间以为打开了一扇大门,技能又提高了一个层次。控制反转(Inversion of Control)就是这些强大概念之一。一年前在 MPJ 老师的频道上了解到了,但一直没本身独立创造场景用过。直到最近在项目中遇到个坑才用起来。前端

其实控制反转或者依赖注入(这两个感受是同一个东西,看你从什么角度看)在前端框架中已经大量使用了。最先由 Angular 普及,后续的现代框架都有应用。好比 React 开发中目前最火的组件设计模式 Render Props,就是控制反转的一种应用。离开框架,在平常开发中,应用这种技巧能够帮助咱们解决不少棘手的问题,今天就讲下我在开发中的一次应用。vue

项目场景是这样的:技术栈是 Nuxt + Vuex。项目须要链接 Web Socket,而后根据 Socket 传来的数据,对 Vuex 里面相应的数据进行修改。公司为了节约成本,将 Socket 数据压缩了,并且不是全量推送,这要求前端收到数据后对数据进行解压,而后对数据进行遍历查找,更新,从新计算和排序,总之对 Socket 数据的处理很是复杂。为了避免影响性能,我把 Socket 链接和数据处理放进了 Web Worker。先来看下项目结构。webpack

下面是我封装的一个 Socket 工厂函数:web

// utils/socket.js
export default function Socket() {
  let heartBeat; // 心跳记录
  let lost = 0; // 心跳失败次数

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          // 心跳失败超过 3 次,断开重连
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}

Socket 链接实现了心跳机制。onopen 以后,每隔 5 秒向服务器发送 2,并把心跳失败次数加 1;服务器收到 2 以后会返回 3,客户端收到 3 以后再把心跳失败次数减 1。工厂函数暴露的 decLost 方法是为了外部在收到 3 以后把心跳次数减 1.vuex

在 Web Worker 文件里面,调用 Socket 工厂函数,并链接 socket:编程

// workers/socket.js
import Socket from "~/utils/socket";

const socket = Socket();
const socketConnection = socket.connect();

socketConnection.onmessage = ({ data }) => {
  // 处理 socket 接收到的数据,
  // 处理完后经过 web worker 接口发出去
  postMessage(result);
};

// web worker 收到外部的数据后,把数据发给 socket
onmessage = ({ data }) => {
  socketConnection.send(data);
};

而后在一个 Nuxt 插件里,引入 socket worker,收到 worker 里传来的数据后,把数据交给 Vuex Store,反之,监听到相关 Vuex Mutation 后,把 payload 传给 worker:设计模式

// plugins/socket.js
// webpack 下导入 web worker 的方式:
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {
    // 监听到相关 Vuex Mutation
  });

  socketWorker.onmessage = ({ data }) => {
    //监听 socket 发来的数据,收到数据后,
    // 经过 store.commit() 来把数据存入 vuex store
  };
};

这是我一开始写的 naive 版本,看起来主要功能实现了,并且封装和 Separation of concerns 作的也不错。写完刚跑起来,问题出现了。promise

挑战一:等 socket 链接成功后再发起订阅

当应用打开后,须要当即订阅推送数据,包括用户登陆状态下的私有数据和其它基础数据等。可是当发起订阅时,socket 可能链接成功了,也可能还没链接成功。一开始我想设置个定时器,过两秒后再发起订阅。但是想一想这种作法也太挫了。第二个思路是在 socket 链接的 onopen 事件里执行订阅。但是这样子会直接把之前的 onopen 覆盖掉,并且这样作违反了封装原则。剩下就一个办法了,等链接成功后再发请求。来看怎么作的:前端框架

// workers/socket.js
// ...
// const socketConnection ...

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

// ... 其它细节

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};

waitForConnection 函数会每隔 100 ms 检查 socket 链接状态,若是链接状态是 1(成功),则 resolve Promise,不然一直隔 100 ms 检查一次,直到链接成功或者超过指定时间。服务器

在向 socket 发送数据以前,先调用 waitForConnection,并指定最多等 2 秒,确保链接成功后再发送数据。

问题看起来解决了。奇淫技巧都用上了,让我满意了一下子。直到……

需求来了!

在 socket 断开重连后,须要续订以前的订阅。而包括用户 token 等订阅参数全都在 Vuex Store 里面。那这下头疼了,Vuex store 里面是无法知道断开重连的,而 worker 里面则根本无法读取 vuex store。知道这个需求后我心里是崩溃的,这根本无法写下去了啊!就在我都快要打算调整架构重写时,一拍脑壳灵光一闪,试试控制反转!

首先要让 Socket 工厂函数有个判断重连的机制。这个简单。

// utils/socket.js
export default function Socket() {
  let connectCount = 0; // 链接成功次数
  // ...细节,见文章前面
  socket.onopen = () => {
    // 每次链接成功,链接次数加1
    connectCount += 1;

    if (connectCount > 1) {
      // 若链接次数超过一次,则说明这次是重连
      // 在这里能够作些重连以后的操做了
    }
  };
  // ...
}

重连以后具体作什么事,这能够用依赖注入来实现。先在 worker 文件里定义要作的事情,而后在调用 Socket 工厂函数时注入方法:

// worker/socket
// 经过 postMessage 通知外部重连
const notifyReconnect = () => {
  postMessage({ type: "reconnect" });
};

const socket = Socket(notifyReconnect);

而后在 Socket 函数里接收一下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  // ...细节,见文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect();
    }
  };
  // ...
}

我觉得写到这里应该就能够了的,然而我仍是太天真了。运行后,plugins/socket 文件里能接收到重连消息,可是一直链接失败。这个问题很诡异,最后发现仍是由于我对 Web Socket 掌握的不深致使的。每次 socket 链接后,生成的链接实例都是新的。而我在 waitForConnection 方法里监听的 socketConnection 在关闭后,readyState 一直是 3(关闭状态),致使 waitForConnection 方法一直报 timeout 错误。

剩下的最后问题是每次重连,都更新链接实例。方法以下:

// worker/socket
let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();
// utils/socket
export default function Socket(notifyReconnect) {
  // ...细节,见文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect(socket);
    }
  };
  // ...
}

Socket 函数在调用 notifyReconnect 时,传入最新的链接实例 socket

至此,功能都实现了。完整代码以下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  let heartBeat;
  let lost = 0;
  let connectCount = 0;

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      connectCount += 1;
      if (connectCount > 1) {
        notifyReconnect(socket);
      }
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}
// workers/socket.js
import Socket from "~/utils/socket";

let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

socketConnection.onmessage = ({ data }) => {
  // 处理完数据后经过 web worker 接口发出去
  postMessage(result);
};

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};
// plugins/socket.js
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {});

  socketWorker.onmessage = ({ data }) => {
    if (data.type === "reconnect") {
      socketWorker.postMessage(/* 订阅参数 */);
    }
  };
};

Edit: 测试时发现上面写法有个问题。即重连后,虽然链接实例更新了,可是 onmessage 事件没有更新,致使重连后的 socket 数据无法处理。解决办法是把重连更新链接实例的逻辑放在 plugins/socket.js 文件。Socket 工厂函数的 connect 方法返回一个 Promise,在新的链接实例的 onopen 事件里 resolve promise。这样调整以后,就用不到 waitForConnection 函数了。

相关文章
相关标签/搜索