ActionCable: WebSocket Connection

每次 HTTP 请求,都是一个 request 到 response 的过程,因此 HTTP 服务器不须要维护任何状态,html

而 WebSocket 须要 server 维护全部的链接状态,Ruby 并不很擅长处理并发。ActionCable 为了性能和易用性,用到了不少有趣的技术。java

好比 message 的 pub/sub 使用了 redis 的 pub/sub (也有其余的 adapter)。git

在并发处理上,使用了 current-ruby 的 ThreadPoolExecutor,而处理 WebSocket 的链接,使用了 nio4r 和 websocket-driver。github

下面主要介绍一下这两种技术和 I/O 模型。web

websocket-driver

websocket-driver 主要处理 WebSocket 协议,并对 I/O 层作了解耦,也就是说,能够用不一样的方式来处理 I/O 事件。同时提供了 :open,:message,:close 等一系列事件,方便使用。redis

使用 websocket-driver 的 Server-side scoket 须要实现两个方法,url 和 write(string)urlruby

url 是用户用来链接 websocket 的url。服务器

write(string) 方法用来将数据写入到 stream 内。websocket

以后,当 I/O 监听到数据的时候,再调用 WebSocket::Driver#parse 方法对数据进行解析。session

解析完成后,WebSocket 会经过 :message 事件将解析好的数据传回

websocket-driver README 给出的 example 使用的是 EventMachine 处理 I/O 操做,而 ActionCable 则直接使用了 nio4r。

I/O 模型

I/O 操做,就是在主存(main memory)和外设之间作数据的复制操做。

而外设通常都比主存慢不少,为了处理速度不匹配问题,可使用不一样的方法对 I/O 进行操做,既 I/O 模型。

同步阻塞IO(Blocking IO)

同步阻塞 I/O,即传统的 I/O 模型。

简单来讲,就是当进行 I/O 操做的时候,线程被挂起(suspend),直到 I/O 操做进行完,再执行程序以后的操做。也就是说,在进行 IO 操做的时候,线程一直处于等在状态,因此称为 Blocking IO,Ruby 提供了 IO.read 方法。

同步非阻塞IO(Non-blocking IO)

进行 IO 操做时,不管系统是否准备好数据,都直接返回。若是没有准备好,则返回 error,当用户收到 error 的时候,能够再次尝试 IO 操做。Ruby 提供了IO.read_nonblock 方法。

IO多路复用(IO Multiplexing)

经过 select 让 Kernal 挂起(suspend)当前线程,当一个或者多个 I/O 事件发生的时候,再把控制权交给应用程序。

IO多路复用适用于同时须要监听多个 IO 对象的场景。

select 的时间复杂度是 O(n),Linux 提供了更高效的版本 epoll,epoll 时间复杂度是 O(log n))。thin 提供了是否使用 epoll 的选项。Ruby 提供了 IO.select 方法。

nio4r

nio4r 能够说是 java.nio 的 Ruby 实现,但提供了更简单的接口。

nio4r 主要有两个部分:

Selectors: 用来同时监控多个 I/O 对象

Monitors: 用来追踪所关心的 I/O 事件,好比读。

require "nio"

server = TCPServer.new("127.0.0.1", 12345)

selector = NIO::Selector.new

3.times do
  client = server.accept
  _monitor = selector.register(client, :r)
end

ready = selector.select

首先是建立一个 NIO::Selector对象,而后将所关心的 I/O 事件经过 NIO::Selector#registor 注册到 selector 对象上。

最后在一个循环内,调用 NIO::Selector#select 方法,选出 ready 的事件。

在 ActionCable 中处理 I/O 的类是 Connection::StreamEventLoop,初始化 selector 在 @nio ||= NIO::Selector.new,而后由 StreamEventLoop#attach 方法将 I/O 事件注册到 nio selector 上。

最后由 run 方法来处理事件监听 run,使用 IO.read_nonblock 对 stream 进行读取操做

References

相关文章
相关标签/搜索