模块地址: https://github.com/netwarps/libp2p-rs/blob/master/core/src/peerstore.rsgit
在rust-libp2p中,当协议想要获取peer_id所对应的地址时,须要实现NetworkBehaviour的addresses_of_peer方法。与之不一样的是,go-libp2p使用peerstore来存储了peer_id与address之间的关系,所以咱们能够参考它来实现一个rust版本的peerstore。github
实现构想
首先,因为咱们的启动核心是swarm,那peerstore会做为其中的一个属性。其次,在go-libp2p-peerstore中,peerstore主要存放的数据为三块:地址信息的AddrBook,公私钥信息的KeyBook,协议信息的ProtoBook;咱们能够将三者结合在一块儿组成一个新的struct,取名为PeerRecord,放在以peer_id为key的Hashmap中。json
垃圾回收机制
GC存在的意义就是防止hashmap过分膨胀。因为网络状态时时刻刻都在发生变化,peer之间的链接也可能随之变化,而peerstore有一个重要的做用就是存放peer的地址信息。若是不对已经失效的peer信息进行清理,就会影响到peerstore的工做效率。安全
目前实现的效果是,在swarm的start方法中使用task::spawn启动一个任务,建立一个mpsc的channel,使用select语法等待管道传来的消息或者 task 等待10分钟的逻辑完成,针对某些地址,若是已经超出ttl的限制,清理当前地址;同时,若是当前peer_id的地址集合中不存在任何的地址信息,就将其从Hashmap中移除。网络
Pinned
虽然GC机制的存在,使Hashmap不会无限制扩容,良好地帮助了系统的运行。可是仍然有些不足的地方,考虑以下这种状况:并发
对于KAD协议来讲,peer须要不停地进行迭代查询已知节点,逐步填充本身的KBucket,这是一个耗时的过程。若是在gc时,将较早查询到的节点地址信息从peerstore中移除了,那么又须要从新启用迭代查询去获取地址,所以咱们在PeerRecord中添加了一个bool值pinned。GC时会判断这条记录的类别,若是pinned为true,就会跳过清理的步骤。async
序列化与持久化
对每个peer来讲,由于某些缘由须要下线或停机时,存放在peerstore里的节点信息是不该该被丢弃的。而对于须要持久化的数据,也须要进行序列化操做,便于存放。分布式
在libp2p-rs中,主循环的调用也是经过task::spawn启动的。当swarm接收到close的消息时,将会退出事件处理的循环,并向运行peerstore的gc线程发送一个close()的事件,结束gc的过程。接下来调用peerstore的save_data()方法,将数据使用serde序列化成json格式,并使用std::io将序列化后的数据存放到根目录的txt文件中。oop
方法分析
以GC方法进行解析:区块链
- swarm主循环spawn运行task,每十分钟触发一次select。
- Hashmap被Arc<Mutex>包裹,能够经过lock()获取,保证并发安全。
- 若是该peer的信息不是经过kad获取的,调用retain筛选未超出ttl时限的地址。
- 若是当前peer的地址数据已经清空,从hashmap中移除这个peer。
// swarm/lib.rs // The GC task is to remove all expired addresses from the peer store task::spawn(async move { log::info!("starting Peerstore GC..."); loop { let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await; match either { Either::Left((_, _)) => break, Either::Right((_, _)) => peer_store.remove_expired_addrs(), } } log::info!("quitting Peerstore GC..."); }); // core/peerstore.rs /// Removes all expired address. pub fn remove_expired_addrs(&self) { let mut to_remove = vec![]; let mut guard = self.inner.lock().unwrap(); for (peer, pr) in guard.iter_mut() { if !pr.pinned { log::debug!("GC attempt for {:?}", peer); pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl); // delete this peer if no addr at all if pr.addrs.is_empty() { log::debug!("remove {:?} from peerstore", peer); to_remove.push(peer.clone()); } } } for peer in to_remove { guard.remove(&peer); } }
Netwarps 由国内资深的云计算和分布式技术开发团队组成,该团队在金融、电力、通讯及互联网行业有很是丰富的落地经验。Netwarps 目前在深圳、北京均设立了研发中心,团队规模30+,其中大部分为具有十年以上开发经验的技术人员,分别来自互联网、金融、云计算、区块链以及科研机构等专业领域。 Netwarps 专一于安全存储技术产品的研发与应用,主要产品有去中心化文件系统(DFS)、去中心化计算平台(DCP),致力于提供基于去中心化网络技术实现的分布式存储和分布式计算平台,具备高可用、低功耗和低网络的技术特色,适用于物联网、工业互联网等场景。 公众号:Netwarps