使用Server Sent Events制做一个代码在线运行工具

¿

最近尝试制做了一个可以在线运行代码的工具: Code-Runner, 踩坑很多, 作一个总结html

涉及到的技术/模块以下:node

  • 服务端推送技术Server Sent Events, 下文简称SSEgit

  • Node.js模块:github

    • child_process模块
    • stream模块
  • Docker几条简单的命令docker

    • docker run
    • docker pull
    • docker kill
  • Koa2: 顺便用用, 不是核心后端

  • pug: 渲染模板bash

参考服务器

阮一峰: Server-Sent Events 教程socket

优雅的终止docker容器async

1. SSE 用做服务端推送

严格地说,HTTP 协议没法作到服务器主动推送信息。可是,有一种变通方法, 就是服务器向客户端声明,接下来要发送的是流信息(streaming)。这时, 客户端 不会关闭链接, 而是会一直等着服务器发过来的新数据流。

1.1 SSE 和 WebSocket 对比

相对于WebSocket, 它有以下的特色:

  • SSE使用HTTP协议; WebSocket是一个独立的协议

  • SSE使用简单, 轻量级; WebSocket你确定要引入socket.io, ws之类的库

  • SSE默认支持断线重连; WebSocket须要本身实现

  • SSE通常用来传输文本; WebSocket默认支持传送二进制数据

若是仅仅须要服务端推送这个功能的话, 使用SSE的开发成本是最低的, 兼容性以下

SSE-兼容性

1.2 SSE 最简单接入

假设接口地址为/sse, 服务端代码(以Koa2为例)为:

const { PassThrough } = require('stream')

router.get('/sse', ctx => {
  const stream = new PassThrough()
  setInterval(() => { stream.write(`: \n\n`) }, 5000)
    
  ctx.set({ 'Content-Type':'text/event-stream' })
  ctx.body = stream
})
复制代码

客户端

const eventSource = new EventSource('/sse')
复制代码

把空行算上, 仅仅 10 行代码, 就完成了先后端的SSE链接, 效果以下

1.3 SSE 事件流格式

事件流仅仅是一个简单的文本流数据,文本应该使用UTF-8格式的编码,每条消息后面都有一由一个空行做为分隔符 以冒号,以冒号开头的行为注释行,会被忽略。

注释行能够用来防止链接超时,服务器能够按期发送一条消息注释行,以保持链接不断

这样看不够直接,代码表述以下:

// 发送注释行
stream.write(`: \n\n`)

// 发送自定义事件test, 数据为字符串: this is a test message
stream.write(`event: test\ndata: this is a test message\n\n`)

// 发送自定义事件test1, 数据为一个对象: { msg: 'this is a test message' }
stream.write(`event: test1\ndata: ${JSON.stringify({ msg: 'this is a test message' })}\n\n`)
复制代码

客户端监听自定义事件:

const eventSource = new EventSource('/sse')

eventSource.addEventListener('test', event => {
  console.log(event.data) // this is a test message
})
复制代码

没错, 就是这么简单

2. Code-Runner 具体实现

2.1 总体流程

  1. 客户端发出GET /sse HTTP/1.1请求
  • 监听自定义事件sse-connect: 得到一个身份标识id
  • 监听自定义事件sse-message: 进度消息的推送, 例如镜像拉取、代码执行开始、代码执行结束
  • 监听自定义事件sse-result: 代码执行结果的推送
  1. 用户提交代码POST /runner HTTP/1.1
  • 拉取镜像, 例如: docker pull node:latest
  • 将用户提交代码写入文件, 例如: /code/main-1.js
  • 启动容器, 例如: docker run --rm --name runner-1 -v /code:/code node:latest node /code/main-1.js
  • 根据身份标识id, 将结果写入对应的流
  • 关闭容器

2.2 SSE的封装

封装目标:

  • 能够根据身份标识id得到对应的流

  • 对发送自定义事件的封装

  • 对保持链接不断的封装

  • 维护一个实例表, 便于向对应的流推送消息

const { PassThrough } = require('stream')

const instanceMap = new Map()
let uid = 0

/** * Server Sent Events封装 */
module.exports = class SSE {
  /** * 构造函数中初始化转换流、身份标识、执行初始化方法 */
  constructor(options = {}) {
    this.stream = new PassThrough()
    this.uid = ++uid
    this.intervalTime = options.intervalTime || 5000
    this._init()
  }
  /** * 根据uid获取SSE实例 */
  static getInstance(uid) {
    return instanceMap.get(+uid)
  }
  /** * 根据uid发送自定义事件 */
  static writeStream(uid, event, data) {
    const instance = this.getInstance(uid)

    if (instance) instance.writeStream(event, data)
  }
  /** * 初始化函数中记录当前实例, 并保持长链接 */
  _init() {
    instanceMap.set(this.uid, this)

    this._writeKeepAliveStream()
    const timer = setInterval(() => { this._writeKeepAliveStream() }, this.intervalTime)

    this.stream.on('close', () => {
      clearInterval(timer)
      instanceMap.delete(this.uid)
    })
  }
  /** * 经过发送注释消息保持长链接 */
  _writeKeepAliveStream() {
    this.stream.write(': \n\n')
  }
  /** * 发送自定义事件 */
  writeStream(event, data) {
    const payload = typeof data === 'string' ? data : JSON.stringify(data)

    this.stream.write(`event: ${event}\ndata: ${payload}\n\n`)
  }
}
复制代码

封装后, /sse接口代码简化为:

router.get('/sse', ctx => {
    ctx.set({
      'Content-Type':'text/event-stream',
      'Cache-Control':'no-cache',
      'Connection': 'keep-alive'
    })
    const sse = new SSE()
    sse.writeStream('sse-connect', sse.uid)

    ctx.body = sse.stream
  })
复制代码

2.3 限制容器的使用时长

执行用户代码, 须要限制容器的使用时长, 虽然一直有Issue: 给docker run命令增长timeout选项, 可是最佳的中止容器运行的方式仍是docker stop / docker kill

docker stop: 用docker stop命令来停掉容器的时候,docker默认会容许容器中的应用程序有10秒的时间用以终止运行, 若是等待时间达到设定的超时时间,或者默认的10秒,会继续发送SIGKILL的系统信号强行kill掉进程

docker kill: 默认状况下,docker kill命令不会给容器中的应用程序有任何gracefully shutdown的机会。 它会直接发出SIGKILL的系统信号,以强行终止容器中程序的运行

所以, 此处采用docker kill更符合需求, 中止容器的代码以下:

/** * 中止Docker容器 * @description * exec方法中的timeout选项在执行“docker run”命令时无效, 所以采用“docker kill”命令来限制容器使用时长 * 经过docker kill, childProcess exitCode值为137 * @param {string} containerName 容器名称 * @param {number} timeout 限制使用时长 * @return {number} timer */
function stopDocker(containerName, timeout) {
  return setTimeout(async () => {
    try {
      await exec(`docker kill ${containerName}`)
    } catch (e) { }
  }, timeout)
}
复制代码

注: 此处child_process.exectimeout选项并不能中止docker容器

2.4 流的方式得到输出

child_process.exec方法执行命令返回的结果是buffer/string, 此处咱们须要使用流的方式, 就要使用child_process.spawn方法

容器启动的部分代码以下:

/** * 启动Docker容器并使用流模式获取输出 * @param {object} dockerOptions 启动docker的配置 */
function startDockerBySpawn(dockerOptions) {
  // 得到容器名, 镜像名, 执行命令, 挂载卷
  const { containerName, imageName, execCommand, volume } = dockerOptions
  // 参数差别
  const commandStr = `docker run --rm --memory=50m --name ${containerName} -v ${volume} ${imageName} ${execCommand}`
  const [command, ...args] = commandStr.split(' ')
  // 启动容器
  const childProcess = spawn(command, args)
  
  //...
}
复制代码

容器启动后, 能够得到两个流:

  • childProcess.stdout

  • childProcess.stderr

咱们须要把两个流的数据组合起来, 而后将其转换为SSE数据格式, 最终写入到目标流targetStream, 代码以下:

const t = new SSETransform()
const transferStation = new PassThrough()

childProcess.stdout.pipe(transferStation)
childProcess.stderr.pipe(transferStation)

transferStation.pipe(t).pipe(targetStream, { end: false })
复制代码

自定义的转换流以下:

const { Transform } = require('stream')
/** * 自定义转换流 * @description * 将child_process.stdout/stderr的可写流转换为EventStream的格式 */
module.exports = class SSETransform extends Transform {
  constructor(eventName) {
    super()
    this.eventName = eventName || 'sse-result'
  }
  _transform(chunk, encoding, callback) {
    callback(null, `event: ${this.eventName}\ndata: ${JSON.stringify({ result: chunk.toString('utf8') })}\n\n`)
  }
}
复制代码

一次代码执行流程得到的数据以下图所示

至此, 就完成了一个代码在线运行工具的开发

相关文章
相关标签/搜索