做者:谢敬伟,江湖人称“刀哥”,20年IT老兵,数据通讯网络专家,电信网络架构师,目前任Netwarps开发总监。刀哥在操做系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。git
Rust
历史不长,仍然处于快速发展的历程中。关于异步编程的模式,如今已经发展到async/await
协程的高级阶段。大概是由于async/await
出现的时间还不长,因此现有大多数的开源项目并非或不是纯粹使用async/await
来书写的,而是前先后后有多种的写法。这样的情况给Rust
的学习带来了一些的难度。在这里,咱们来捋一捋异步代码的几种写法。程序员
最原始的方式是使用mio
进行开发。mio
是一个底层异步I/O
库,提供非阻塞方式的API
,具备很高的性能。实际上mio
是对于操做系统epoll/kqueue/IOCP
的封装。在C/C++
中咱们使用libevent
之类的库,mio
能够理解为对应的Rust
版本。基于mio
的代码大体以下:编程
loop { // Poll Mio for events, blocking until we get an event. poll.poll(&mut events, None)?; // Process each event. for event in events.iter() { if event.is_writable() { // socket可写,开始发送数据 } if event.is_readable() { // socket可读,开始接收数据 } // socket 关闭,退出循环 return Ok(()); } }
总的来讲,这是彻底基于异步事件通知的写法,和C/C++
区别不是很大,异步代码对于程序员是一个挑战,当代码逻辑愈来愈复杂,添加新功能或是解决已有问题的难度也愈来愈大。安全
另外,mio
实现的是一个单线程事件循环,虽然能够处理成千上万路的I/O
操做,但没有多线程的能力,须要本身扩充。微信
为了更好地规范异步的逻辑,Rust
抽象出Future
表示还没有发生的事物。这些Future
能够用不少方式組合成一个更复杂的复合Future
来表明一系列的事件。Future
须要程序主动去poll
(轮询)才能获取到最终的结果,每一次轮询的结果多是Ready
或者Pending
。网络
运行库提供Executor
和Reactor
来执行Future
,也就是调用Future
的poll
方法循环执行一系列就绪的Future
,当Future
返回Pending
的时候,会将Future
转移到Reactor
上等待唤醒。Reactor
被用来负责唤醒以前没法完成的Future
。事实上,tokio
的Reactor
是基于mio
实现的,而async-std/smol
则是封装了epoll/kqueue/IOCP
,提供相似的功能。多线程
手动实现Future
是一件相对繁琐的工做,主要的问题在于异步模式自己的特性。例如,接收网络数据,没法臆测每次轮询会收到多少字节的数据,每每须要开辟一段接收缓冲区容纳数据,协议解码也须要一个状态机拼包向上层提交;发送网络数据存在类似问题,发送数据时底层未就绪,则缓冲发送数据,待下次轮询时,须要首先检查并处理发送缓冲区。另外还有一些值得注意的地方,若是手动实现的Future
返回Pending
,则必须本身实现唤醒机制,也就是须要将cx
克隆一份记下来,而后在适当的时侯调用cx.wake()
。由于网络相关的功能每每是分层的,所以手动的Poll
循环也会是层层堆叠的,这时候,返回值Poll::Ready(T)
就有学问了。泛型T可能包裹各类不一样的数据,Option<T>
,Result<T,E>
,或者二者的组合。由于最外层还有一个Poll<T>
,全部这时候的match
语句写起来会很是臃肿,粘贴复制写不少代码,完成的功能却很是有限,并且因为这些代码很类似,大大增长了出错的可能性。架构
标准库中仅仅定义了Future
,更多的相关功能须要引用futures-rs
类库,里面定义了一系列有关异步的操做,包括Stream
、Sink
、AsyncRead
、AsyncWrite
等基础Trait
,以及对应实现了大量方便操做的组合子的Ext Trait
,特别用途的fused
、Box
,Try
系列的扩展,诸如join!
、select!
、pin_mut!
等一系列的宏。理论上,不使用这些扩展也能写出代码,只不过那样的代码极可能篇幅会长的可怕。值得一提的是,除了一些能够简化代码的过程宏以外,扩展Trait
提供的组合子也会让代码精简很多。好比Future::and_then
可让代码写成链式调用的方式;Sink::send
包装了Sink
发送三步骤 poll_ready/start_send/poll_flush
,使用.await
一行代码直接就能够完成发送。所以,不少poll
方式的代码其实是准确地说是混合式的,其中也使用了很多async
代码块。并发
总之,搞清楚Future
相关的这些内容是须要花费很多时间,更不用说用它们来写代码了。不过,即使是使用async/await
这种更高级原语,也是有必要了解底层的工做原理和实现机制,所谓知其然知其因此然。异步
使用async/await
能够将异步的代码写得相似同步的过程,更加符合人体工程学。由于async
被翻译为一个Future
状态机,原先在poll
方式中须要处理的与Pending
相关的状态如今都由async
生成的状态机自动完成,所以大大减轻了程序员的心智负担。
如前所述,底层的Futures
提供了不少方便的组合子扩展Future
,使用起来很简洁,能够极大地简化代码。例如,上文提到过的Sink::send
包装了发送缓冲区的实现和异步发送的三个步骤;AsyncRead::read_exact
实现了读取指定字节数的功能,在处理网络协议解析时能够避免手写一个拼包状态机;AsyncWrite::write_all
实现了发送所有数据以及发送缓冲,等等。正是在这些底层功能的支持下,async/await
成为了更高级的书写异步代码的方式。也许会有少量担忧,这样所谓“高级”会不会在性能上有很大损失?笔者我的不这么认为。自动实现的状态机也许未必比程序员手动完成的性能更差。状态机编程对于任何人,即使是一个有经验的程序员都是不小挑战。蹩脚的状态机实现不只可能有性能问题,更大的风险来自于实现上的漏洞,以及维护上的困难。代码写出来更可能是给别人看的,完成一样的功能,简洁的代码更有多是更高质量的代码。
如下例子是固定长度分割的报文接收过程,使用async/await
是很简单的。若是实现为一个Stream/poll_next
,代码会复杂不少。
/// convenient method for reading a whole frame pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> { let mut len = [0; 4]; let _ = self.inner.read_exact(&mut len).await?; // inner socket, 支持 AsyncRead let n = u32::from_be_bytes(len) as usize; if n > self.max_frame_len { let msg = format!( "data length {} exceeds allowed maximum {}", n, self.max_frame_len ); return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg)); } let mut frame = vec![0; n]; self.inner.read_exact(&mut frame).await?; Ok(frame) }
最后,彻底使用async/await
写代码目前还有几个问题:
当前Trait
不支持 async fn
,没法直接用Trait
来抽象异步方法。暂时解决办法是使用三方库 async-trait
。以下:
use async_trait::async_trait; #[async_trait] trait Advertisement { async fn run(&self); }
宏 async_trait
将代码转换为一个返回 Pin<Box<dyn Future + Send + 'async>>
的同步方法。由于装箱和动态派发的缘由,性能上会有少量损失。
当前drop
方法必须是同步调用,不能使用await
语法。当一个I/O
对象越过生命周期被析构,每每在关闭底层句柄以前,还须要完成某些I/O
操做。好比,通知网络对端链接已经关闭。在同步代码中,咱们只须要在drop()
中置入这些操做,可是在异步代码中,没法在drop()
中作相似的事情。
解决办法,老是在异步I/O
对象越过生命周期以前显式地执行关闭动做,或是,实现一个相似GC
的功能,专门负责清理工做。
笔者在学习Rust
过程当中,主要关注网络相关的并发编程。由于以前有在Go
版本的ipfs/libp2p
上的开发经验,故而学习研究了rust-libp2p
以及nervos tentacle
。rust-libp2p
是Parity
实现的准官方版本,可是这个项目的代码及其难懂,过于强调使用泛型参数的抽象,致使代码可读性很是差。请教了代码做者,他认可代码可能有些复杂,但也强调都是有缘由的... nervos tentacle
的实如今协议上不够完整,特别是与标准libp2p
并不兼容。两个项目共有的特色是主要用poll
的方式写代码,逻辑上都是状态机的嵌套。
所以,笔者试图彻底使用async/await
方式重构libp2p
,参考rust-libp2p
的实现,代码协程化,向上层提供纯粹的异步接口,争取在API
层面的体验接近go-libp2p
,这是推广Rust
协程机制的一个尝试,同时也是我的的一个学习的过程。目前刚刚起步,仅完成了secio
与yamux
部分,待合适时机开源,指望更多Rust
爱好者共同来开发完善。
深圳星链网科科技有限公司(Netwarps),专一于互联网安全存储领域技术的研发与应用,是先进的安全存储基础设施提供商,主要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操做系统(BOS)。微信公众号:Netwarps