前面咱们对go-libp2p中swarm拨号源码进行了分析(【go-libp2p源码剖析】Swarm拨号),参考go-libp2p,咱们在libp2p-rs上完成swarm拨号功能的开发。功能基本上和go-libp2p保持一致,稍微作了精简,去掉了go-libp2p拨号的部分功能,如DialSync中的同步拨号限制。下面对libp2p-rs swarm拨号功能的实现作一个详细的说明。git
仓库地址:https://github.com/netwarps/libp2p-rs.git
拨号相关代码主要分布在swarm/src/lib.rs
和swarm/src/dial.rs
两个文件中github
类图以下:安全
LIBP2P_SWARM_DIAL_ATTEMPTS
对重试次数作修改。LIBP2P_SWARM_DIAL_LIMIT
对并发拨号数作修改 时序图以下:网络
注:咱们拨号没有将connection直接返回(由于只有在open_stream时才用到了connection,若是将值返回显得有点多余,返回可变引用又会有生命周期相关问题)。这里会构造一个闭包(主要用来打开流并返回流),最终在ConnectionEstablished事件或OutgoingConnectionError事件处理函数中执行这个闭包。
因为拨号须要启动多个task,若是一路传递下去的话,闭包须要支持clone才行,闭包捕获了外部的oneshot::Sender,它不支持clone,因此为求方便咱们将闭包暂存在Swarm里的dial_transactions中,它是一个hashmap数据结构,key值是每次操做生成的惟一值,咱们命名为TransactionId。这个TransactionId最终会带到ConnectionEstablished事件或OutgoingConnectionError事件对应的处理函数,最后咱们能够根据TransactionId将闭包remove出来执行。数据结构
部分代码片断闭包
type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> { if let Some(connection) = self.get_best_conn(&peer_id) { ...... } else { // dialing peer, and opening a new stream in the post-processing callback self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| { match r { Ok(connection) => { connection.open_stream(pids, |r| { let _ = reply.send(r.map_err(|e| e.into())); }); } Err(e) => { let _ = reply.send(Err(e)); } } }); } Ok(()) }
fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) { ...... // allocate transaction id and push box::f into hashmap for post-processing let tid = self.assign_tid(); self.dial_transactions.insert(tid, Box::new(f)); self.dialer .dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid); }
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> { ...... // dial callback for post-processing // note that it must cleanup the tid entry if let Some(id) = tid { // the entry must be there let callback = self.dial_transactions.remove(&id).expect("no match tid found"); callback(Ok(&mut connection)); } ...... }
Swarm拨号时会调用AsyncDialer的dial方法。这里首先启动一个新的task,再调用start_dialing方法。start_dialing方法实现了对拨号的重试功能,它会等待拨号结果,将拨号结果返回给dial,成功则发送ConnectionEstablished事件,失败则发送OutgoingConnectionError事件,在事件处理函数中会直接直接第一步传入的闭包。并发
pub(crate) fn dial( &self, peer_id: PeerId, transports: Transports, addrs: EitherDialAddr, mut event_sender: mpsc::UnboundedSender<SwarmEvent>, tid: TransactionId, ) { let dial_param = DialParam { transports, addrs, peer_id, tid, limiter: self.limiter.clone(), backoff: self.backoff.clone(), attempts: self.attempts, }; task::spawn(async move { let tid = dial_param.tid; let peer_id = dial_param.peer_id.clone(); let r = AsyncDialer::start_dialing(dial_param).await; match r { Ok(stream_muxer) => { let _ = event_sender .send(SwarmEvent::ConnectionEstablished { stream_muxer, direction: Direction::Outbound, tid: Some(tid), }) .await; } Err(err) => { let _ = event_sender .send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err }) .await; } } }); }
async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> { let mut dial_count: u32 = 0; loop { dial_count += 1; let active_param = dial_param.clone(); let r = AsyncDialer::dial_addrs(active_param).await; if let Err(e) = r { log::info!("[Dialer] dialer failed at attempt={} error={:?}", dial_count, e); if dial_count < dial_param.attempts { log::info!( "[Dialer] All addresses of {:?} cannot be dialed successfully. Now try dialing again, attempts={}", dial_param.peer_id, dial_count ); //TODO: task::sleep(BACKOFF_BASE).await; } else if dial_param.attempts > 1 { break Err(SwarmError::MaxDialAttempts(dial_param.attempts)); } else { break Err(e); } } else { break r; } } }
start内部调用了dial_addrs,即对peer的多个地址同时进行拨号。首先检查backoff,若是刚拨号失败过,则直接返回错误。而后针对每一个地址构造一个DialJob,每一个DialJob启动一个task调用limiter的do_dial_job作拨号检查和拨号操做,由于不知道task啥时候能拨号完成,这里传了一个channel tx进去,只要拨号完成就会发回一个消息,再在外面接收,启动几个task就接收几回channel rx的消息,一旦发现有成功的拨号,就将结果直接返回。那些后面再拨号成功的,咱们不关心,让它们自动销毁;对那些拨号失败的添加backoff,避免对失败地址频繁拨号。app
let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>(); let mut num_jobs = 0; for addr in addrs_rank { // first of all, check the transport let r = param.transports.lookup_by_addr(addr.clone()); if r.is_err() { log::info!("[Dialer] no transport found for {:?}", addr); continue; } num_jobs += 1; let dj = DialJob { addr, peer: peer_id.clone(), tx: tx.clone(), transport: r.unwrap(), }; // spawn a task to dial let limiter = self.limiter.clone(); task::spawn(async move { limiter.do_dial_job(dj).await; }); } log::trace!("total {} dialing jobs started, collecting...", num_jobs); self.collect_dialing_result(rx, num_jobs, param).await
async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> { for i in 0..jobs { let peer_id = param.peer_id.clone(); log::trace!("[Dialer] receiving dial result, finished jobs={} ...", i); let r = rx.next().await; match r { Some((Ok(stream_muxer), addr)) => { let reported_pid = stream_muxer.remote_peer(); if peer_id == reported_pid { return Ok(stream_muxer); } else { self.backoff.add_peer(peer_id, addr).await; } } Some((Err(err), addr)) => { if let SwarmError::Transport(_) = err { self.backoff.add_peer(peer_id, addr).await; } } None => { log::warn!("[Dialer] should not happen"); } } } return Err(SwarmError::AllDialsFailed); }
相对go的实现DialLimiter作了精简,去掉了等待列表,失败的咱们不会放到waiting列表里作拨号,而是直接返回错误。AsyncDialer的dial_addrs会调用do_dial_job。do_dial_job中会判断当前正在拨号的数量,若是数量超过咱们的限制,则直接返回ConcurrentDialLimit错误。不然给并发数加1,并调用execute_dial作实际的拨号操做,拨号完成并发数减1。这里对transport的拨号加了一个超时的封装(本地地址默认5秒超时,外部地址默认60s超时),若是超时则直接返回DialTimeout错误。无论拨号成功与否都经过channel将消息送回给AsyncDialer。async
async fn do_dial_job(&self, mut dj: DialJob) { if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit { let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await; return; } self.dial_consuming.fetch_add(1, Ordering::SeqCst); self.execute_dial(dj).await; }
fn dial_timeout(&self, ma: &Multiaddr) -> Duration { let mut timeout: Duration = DIAL_TIMEOUT; if ma.is_private_addr() { timeout = DIAL_TIMEOUT_LOCAL; } timeout } async fn execute_dial(&self, mut dj: DialJob) { let timeout = self.dial_timeout(&dj.addr); let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await; if let Ok(r) = dial_r { let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await; } else { let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await; } self.dial_consuming.fetch_sub(1, Ordering::SeqCst); }
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通讯及互联网行业有很是丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具有十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。
Netwarps 专一于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特色,适用于物联网、工业互联网等场景。
公众号:Netwarps分布式