写在前面:这是一篇近一年前的草稿了,翻出来发现,关于 Task
(已更名为 Thread
)退出的一些作法仍然适用,并且 zmq.rs 0.2 不出意外也要用到,因此仍然把这篇写完贴出来备查。但请注意,文中关于 libgreen
的一些描述已不属实。html
这一篇隔的时间比较长,期间咱们的游戏在准备上线,因此也没时间写 Rust,着重在课余时间研读了各类文档、源码和 issue report——关于 Rust 的 I/O。git
Task
开始个人 zmq.rs 项目终于开始碰到网络操做了。因为 Rust 给封装出来的 I/O 接口都是相对于 Task
同步的,因此目前来看还得给每一批 I/O 操做建立一个 Task
。程序员
这里得多插一点内容了。首先是关于 ZeroMQ,它的一个 socket 是能够跟不少个别的 ZMQ socket 通信的,并且是能够经过不一样的 endpoint 来完成。好比一个 REP 服务端 socket,同时监听了 8080 和 9090 两个端口(endpoint),每一个端口可能都有数十个 REQ 客户端 socket 链接上去;更有甚者,这个 REP socket 也能够主动去链接某些客户端 socket 监听的 endpoint,上门服务。而在全部这些网络拓扑结构的上面,一个 REP socket 对程序员的接口是始终一致的,您只须要反复地从这个 REP socket 读一个数据包,而后发一个数据包就行了。github
这样一来呢,我就得给 REP socket 底层的每个 TCP socket 链接建立至少一个 Task
,以知足其并发性。以前咱们有提到,Rust 提供了 libgreen
和 libnative
两种运行时环境,对应了两种不一样的 Task
实现模型。对于 zmq.rs 来讲,基于目前的阻塞式的 I/O 接口来看,咱们颇有可能须要建立大量的 Task
——这对于 libgreen
的 M:N
模型来讲是垂手可得的,但对于 libnative
来讲倒是值得商榷的,由于 1:1
的模型意味着咱们将会用大量操做系统的线程来微操极少许的 ZMQ socket,这对于追求高并发的 ZMQ 也许并非一个好主意——虽然 Rust 的 Task
模型能极大地避免常规多线程编程中最糟心的那一部分,可是内存占用会不会过高(哈!高!-2015),上下文切换的代价会不会太大等问题还有待于进一步测试。若是结果不理想,也许每批 I/O 一个 Task
的这种设计就要被推倒,新的设计将须要 Rust 提供异步的 I/O 接口。(0.2 确实将这么改 -2015)web
回到原来的话题。由于建立了好多 Task
,必定会碰到的问题就是怎么结束它们,因此我一上来就打算先看一眼这个问题。没想到这一看,看出了好多问题。编程
由于受 Python greenlet 的严重影响,我天然而然地觉得,结束一个 Task
应该用 kill()
——对于一个暂停状态的微线程,扔进去一个 GreenletExit
异常是多么正常的一种方式。但是 Rust 不那么认为。我也想到了因为须要支持 libnative
,中止一个阻塞在 I/O 调用中的线程绝非扔一个 GreenletExit
那么简单,但我仍是义无反顾地去搜各类 rust task kill terminate shutdown 之类的关键词。网络
结果逐步明朗,原来 Rust 在 0.9 以前确实有过 Task
的 kill
功能,是经过 supervisor 模型实现的——即一对 Task
,任何一个挂掉都会致使另外一个挂掉。可是呢,因为一些缘由这个功能被砍掉了,也就是说,在 Rust 里,一个 Task
只能从内部抛错误死掉,没有办法从外部直接杀掉。另外,这个还(竟然!)致使了个人第一个 stackoverflow 回答。多线程
Task
既然没法从别的 Task
中主动杀掉一个 Task
,那么就想办法让这个 Task
自杀。一个长时间运行的 Task
一般处于两种状态:一、执行代码;二、等待事件。并发
对于一个正在执行代码的 Task
,咱们是没法让 Task
本身突然想到该结束了而后戛然而止。只有在某些特殊状况下,咱们才能手工写一些代码,让程序执行一段代码以后,去检查一下是否应该结束了,好比在一个死循环里:异步
rustwhile self.running { // Do everything else }
而大多数其余状况下,从事件等待中跳出来结束一个 Task
更为常见。这里也分两种状况:A、等待 I/O 事件;B、等待 channel
事件。两种状况处理都比较简单,A 的话就给调用加一个稍短的超时,而后重复前面的那个例子,好比:
rusttcp_stream.set_read_timeout(Some(1000)); while self.running { let result = tcp_stream.read_byte(); // Do the rest }
而对于等待一个 channel
的 Task
来讲就更容易了,只要 channel
的另外一端销毁了,这个等待的调用就会自动结束,只须要正确处理调用结果就行了。
其实,上面两个例子中的 self.running
应(至少)为一个 Arc
,由于须要从别的 Task
中来设置这个值。这里还有一种也是用 channel
的处理方式,就是在每次循环的开始处,向一个链接到父 Task
的 channel
的 Sender
端,发送一个空白消息,这样若是另外一端已经销毁了,发送会失败,也就意味着咱们该退出这个 Task
了,好比这样:
rustlet (tx, rx) = channel(); // ... in task let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap(); a.set_timeout(Some(1000)); loop { match a.accept() { Ok(s) => { tx.send(Some(s)); } Err(ref e) if e.kind == TimedOut => { tx.send(None).unwrap(); } Err(e) => println!("err: {}", e), // something else } }