Rust 语言学习笔记(四)—— I/O

写在前面:这是一篇近一年前的草稿了,翻出来发现,关于 Task(已更名为 Thread)退出的一些作法仍然适用,并且 zmq.rs 0.2 不出意外也要用到,因此仍然把这篇写完贴出来备查。但请注意,文中关于 libgreen 的一些描述已不属实。html

这一篇隔的时间比较长,期间咱们的游戏在准备上线,因此也没时间写 Rust,着重在课余时间研读了各类文档、源码和 issue report——关于 Rust 的 I/O。git

从试图杀掉一个 Task 开始

个人 zmq.rs 项目终于开始碰到网络操做了。因为 Rust 给封装出来的 I/O 接口都是相对于 Task 同步的,因此目前来看还得给每一批 I/O 操做建立一个 Task程序员

这里得多插一点内容了。首先是关于 ZeroMQ,它的一个 socket 是能够跟不少个别的 ZMQ socket 通信的,并且是能够经过不一样的 endpoint 来完成。好比一个 REP 服务端 socket,同时监听了 8080 和 9090 两个端口(endpoint),每一个端口可能都有数十个 REQ 客户端 socket 链接上去;更有甚者,这个 REP socket 也能够主动去链接某些客户端 socket 监听的 endpoint,上门服务。而在全部这些网络拓扑结构的上面,一个 REP socket 对程序员的接口是始终一致的,您只须要反复地从这个 REP socket 读一个数据包,而后发一个数据包就行了。github

这样一来呢,我就得给 REP socket 底层的每个 TCP socket 链接建立至少一个 Task,以知足其并发性。以前咱们有提到,Rust 提供了 libgreenlibnative 两种运行时环境,对应了两种不一样的 Task 实现模型。对于 zmq.rs 来讲,基于目前的阻塞式的 I/O 接口来看,咱们颇有可能须要建立大量的 Task——这对于 libgreenM:N 模型来讲是垂手可得的,但对于 libnative 来讲倒是值得商榷的,由于 1:1 的模型意味着咱们将会用大量操做系统的线程来微操极少许的 ZMQ socket,这对于追求高并发的 ZMQ 也许并非一个好主意——虽然 Rust 的 Task 模型能极大地避免常规多线程编程中最糟心的那一部分,可是内存占用会不会过高(哈!高!-2015),上下文切换的代价会不会太大等问题还有待于进一步测试。若是结果不理想,也许每批 I/O 一个 Task 的这种设计就要被推倒,新的设计将须要 Rust 提供异步的 I/O 接口。(0.2 确实将这么改 -2015web

回到原来的话题。由于建立了好多 Task,必定会碰到的问题就是怎么结束它们,因此我一上来就打算先看一眼这个问题。没想到这一看,看出了好多问题。编程

由于受 Python greenlet 的严重影响,我天然而然地觉得,结束一个 Task 应该用 kill()——对于一个暂停状态的微线程,扔进去一个 GreenletExit 异常是多么正常的一种方式。但是 Rust 不那么认为。我也想到了因为须要支持 libnative,中止一个阻塞在 I/O 调用中的线程绝非扔一个 GreenletExit 那么简单,但我仍是义无反顾地去搜各类 rust task kill terminate shutdown 之类的关键词。网络

结果逐步明朗,原来 Rust 在 0.9 以前确实有过 Taskkill 功能,是经过 supervisor 模型实现的——即一对 Task,任何一个挂掉都会致使另外一个挂掉。可是呢,因为一些缘由这个功能被砍掉了,也就是说,在 Rust 里,一个 Task 只能从内部抛错误死掉,没有办法从外部直接杀掉。另外,这个还(竟然!)致使了个人第一个 stackoverflow 回答多线程

正确结束一个 Task

既然没法从别的 Task 中主动杀掉一个 Task,那么就想办法让这个 Task 自杀。一个长时间运行的 Task 一般处于两种状态:一、执行代码;二、等待事件。并发

对于一个正在执行代码的 Task,咱们是没法让 Task 本身突然想到该结束了而后戛然而止。只有在某些特殊状况下,咱们才能手工写一些代码,让程序执行一段代码以后,去检查一下是否应该结束了,好比在一个死循环里:异步

rustwhile self.running {
    // Do everything else
}

而大多数其余状况下,从事件等待中跳出来结束一个 Task 更为常见。这里也分两种状况:A、等待 I/O 事件;B、等待 channel 事件。两种状况处理都比较简单,A 的话就给调用加一个稍短的超时,而后重复前面的那个例子,好比:

rusttcp_stream.set_read_timeout(Some(1000));
while self.running {
    let result = tcp_stream.read_byte();
    // Do the rest
}

而对于等待一个 channelTask 来讲就更容易了,只要 channel 的另外一端销毁了,这个等待的调用就会自动结束,只须要正确处理调用结果就行了。

其实,上面两个例子中的 self.running 应(至少)为一个 Arc,由于须要从别的 Task 中来设置这个值。这里还有一种也是用 channel 的处理方式,就是在每次循环的开始处,向一个链接到父 TaskchannelSender 端,发送一个空白消息,这样若是另外一端已经销毁了,发送会失败,也就意味着咱们该退出这个 Task 了,好比这样:

rustlet (tx, rx) = channel();

// ... in task
let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
a.set_timeout(Some(1000));
loop {
    match a.accept() {
        Ok(s) => { tx.send(Some(s)); }
        Err(ref e) if e.kind == TimedOut => { tx.send(None).unwrap(); }
        Err(e) => println!("err: {}", e), // something else
    }
}
相关文章
相关标签/搜索